You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/25 16:26:34 UTC

[flink] branch master updated: [FLINK-13222][docs] Add documentation for failover strategy option

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ebe1480  [FLINK-13222][docs] Add documentation for failover strategy option
ebe1480 is described below

commit ebe14806b46d89c852aaccca5cec1ed3e3ead136
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Mon Jul 15 18:54:20 2019 +0800

    [FLINK-13222][docs] Add documentation for failover strategy option
    
    Add detailed failover strategies documentation to doc page
    
    Point links in ZH docs to ZH pages
    
    Use a relative path in docs link to flink pages
    
    This closes #9113.
---
 .../generated/job_manager_configuration.html       |  5 ++
 ...tart_strategies.md => task_failure_recovery.md} | 66 +++++++++++++++++++---
 ...trategies.zh.md => task_failure_recovery.zh.md} | 63 +++++++++++++++++++--
 docs/redirects/restart_strategies.md               | 24 ++++++++
 .../flink/configuration/JobManagerOptions.java     | 20 +++++--
 5 files changed, 161 insertions(+), 17 deletions(-)

diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 73477fe..b4ae08e 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -18,6 +18,11 @@
             <td>The maximum number of prior execution attempts kept in history.</td>
         </tr>
         <tr>
+            <td><h5>jobmanager.execution.failover-strategy</h5></td>
+            <td style="word-wrap: break-word;">"full"</td>
+            <td>This option specifies how the job computation recovers from task failures. Accepted values are:<ul><li>'full': Restarts all tasks to recover the job.</li><li>'region': Restarts all tasks that could be affected by the task failure. More details can be found <a href="../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.heap.size</h5></td>
             <td style="word-wrap: break-word;">"1024m"</td>
             <td>JVM heap size for the JobManager.</td>
diff --git a/docs/dev/restart_strategies.md b/docs/dev/task_failure_recovery.md
similarity index 78%
rename from docs/dev/restart_strategies.md
rename to docs/dev/task_failure_recovery.md
index b06b27d..dbd059a 100644
--- a/docs/dev/restart_strategies.md
+++ b/docs/dev/task_failure_recovery.md
@@ -1,5 +1,5 @@
 ---
-title: "Restart Strategies"
+title: "Task Failure Recovery"
 nav-parent_id: execution
 nav-pos: 50
 ---
@@ -22,14 +22,19 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink supports different restart strategies which control how the jobs are restarted in case of a failure.
-The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
-In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.
+When a task failure happens, Flink needs to restart the failed task and other affected tasks to recover the job to a normal state.
+
+Restart strategies and failover strategies are used to control the task restarting.
+Restart strategies decide whether and when the failed/affected tasks can be restarted.
+Failover strategies decide which tasks should be restarted to recover the job.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Overview
+## Restart Strategies
+
+The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
+In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.
 
 The default restart strategy is set via Flink's configuration file `flink-conf.yaml`.
 The configuration parameter *restart-strategy* defines which strategy is taken.
@@ -94,8 +99,6 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 </div>
 
 
-## Restart Strategies
-
 The following sections describe restart strategy specific configuration options.
 
 ### Fixed Delay Restart Strategy
@@ -264,4 +267,53 @@ The cluster defined restart strategy is used.
 This is helpful for streaming programs which enable checkpointing.
 By default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.
 
+## Failover Strategies
+
+Flink supports different failover strategies which can be configured via the configuration parameter
+*jobmanager.execution.failover-strategy* in Flink's configuration file `flink-conf.yaml`.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 50%">Failover Strategy</th>
+      <th class="text-left">Value for jobmanager.execution.failover-strategy</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>Restart all</td>
+        <td>full</td>
+    </tr>
+    <tr>
+        <td>Restart pipelined region</td>
+        <td>region</td>
+    </tr>
+  </tbody>
+</table>
+
+### Restart All Failover Strategy
+
+This strategy restarts all tasks in the job to recover from a task failure.
+
+### Restart Pipelined Region Failover Strategy
+
+This strategy groups tasks into disjoint regions. When a task failure is detected, 
+this strategy computes the smallest set of regions that must be restarted to recover from the failure. 
+For some jobs this can result in fewer tasks that will be restarted compared to the Restart All Failover Strategy.
+
+A region is a set of tasks that communicate via pipelined data exchanges. 
+That is, batch data exchanges denote the boundaries of a region.
+- All data exchanges in a DataStream job or Streaming Table/SQL job are pipelined.
+- All data exchanges in a Batch Table/SQL job are batched by default.
+- The data exchange types in a DataSet job are determined by the 
+  [ExecutionMode]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/ExecutionMode.html) 
+  which can be set through [ExecutionConfig]({{ site.baseurl }}/dev/execution_configuration.html).
+
+The regions to restart are decided as below:
+1. The region containing the failed task will be restarted.
+2. If a result partition is not available while it is required by a region that will be restarted,
+   the region producing the result partition will be restarted as well.
+3. If a region is to be restarted, all of its consumer regions will also be restarted. This is to guarantee
+   data consistency because nondeterministic processing or partitioning can result in different partitions.
+
 {% top %}
diff --git a/docs/dev/restart_strategies.zh.md b/docs/dev/task_failure_recovery.zh.md
similarity index 73%
rename from docs/dev/restart_strategies.zh.md
rename to docs/dev/task_failure_recovery.zh.md
index 043b8f0..e67fb6e 100644
--- a/docs/dev/restart_strategies.zh.md
+++ b/docs/dev/task_failure_recovery.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "重启策略"
+title: "Task 故障恢复"
 nav-parent_id: execution
 nav-pos: 50
 ---
@@ -22,15 +22,21 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink 在作业发生故障时支持不同的重启策略。如果没有为作业定义重启策略,集群启动时就会遵循默认的重启策略。
-如果提交作业时设置了重启策略,该策略将覆盖掉集群的默认策略。
+当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。
+
+Flink 通过重启策略和故障恢复策略来控制 Task 重启:重启策略决定是否可以重启以及重启的间隔;故障恢复策略决定哪些 Task 需要重启。
 
 * This will be replaced by the TOC
 {:toc}
 
-## 概述
+## 重启策略
 
-通过 Flink 的配置文件 `flink-conf.yaml` 来设置默认的重启策略。配置参数 *restart-strategy* 定义了采取何种策略。如果没有启用 checkpoint,就采用“不重启”策略。如果启用了 checkpoint 且没有配置重启策略,那么就采用固定延时重启策略,此时最大尝试重启次数由 `Integer.MAX_VALUE` 参数设置。下表列出了可用的重启策略和与其对应的配置值。
+Flink 作业如果没有定义重启策略,则会遵循集群启动时加载的默认重启策略。
+如果提交作业时设置了重启策略,该策略将覆盖掉集群的默认策略。
+
+通过 Flink 的配置文件 `flink-conf.yaml` 来设置默认的重启策略。配置参数 *restart-strategy* 定义了采取何种策略。
+如果没有启用 checkpoint,就采用“不重启”策略。如果启用了 checkpoint 且没有配置重启策略,那么就采用固定延时重启策略,
+此时最大尝试重启次数由 `Integer.MAX_VALUE` 参数设置。下表列出了可用的重启策略和与其对应的配置值。
 
 每个重启策略都有自己的一组配置参数来控制其行为。
 这些参数也在配置文件中设置。
@@ -88,7 +94,6 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 </div>
 
 
-## 重启策略
 
 以下部分详细描述重启策略的配置项。
 
@@ -260,4 +265,50 @@ env.setRestartStrategy(RestartStrategies.noRestart())
 这对于启用了 checkpoint 的流处理程序很有帮助。
 如果没有定义其他重启策略,默认选择固定延时重启策略。
 
+## 故障恢复策略
+
+Flink 支持多种不同的故障恢复策略,该策略需要通过 Flink 配置文件 `flink-conf.yaml` 中的 *jobmanager.execution.failover-strategy*
+配置项进行配置。
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 50%">故障恢复策略</th>
+      <th class="text-left">jobmanager.execution.failover-strategy 配置值</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>全图重启</td>
+        <td>full</td>
+    </tr>
+    <tr>
+        <td>基于 Region 的局部重启</td>
+        <td>region</td>
+    </tr>
+  </tbody>
+</table>
+
+### 全图重启故障恢复策略
+
+在全图重启故障恢复策略下,Task 发生故障时会重启作业中的所有 Task 进行故障恢复。
+
+### 基于 Region 的局部重启故障恢复策略
+
+该策略会将作业中的所有 Task 划分为数个 Region。当有 Task 发生故障时,它会尝试找出进行故障恢复需要重启的最小 Region 集合。
+相比于全局重启故障恢复策略,这种策略在一些场景下的故障恢复需要重启的 Task 会更少。
+
+此处 Region 指以 Pipelined 形式进行数据交换的 Task 集合。也就是说,Batch 形式的数据交换会构成 Region 的边界。
+- DataStream 和 流式 Table/SQL 作业的所有数据交换都是 Pipelined 形式的。
+- 批处理式 Table/SQL 作业的所有数据交换默认都是 Batch 形式的。
+- DataSet 作业中的数据交换形式会根据 [ExecutionConfig]({{ site.baseurl }}/zh/dev/execution_configuration.html) 
+  中配置的 [ExecutionMode]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/ExecutionMode.html)
+  决定。
+
+需要重启的 Region 的判断逻辑如下:
+1. 出错 Task 所在 Region 需要重启。
+2. 如果要重启的 Region 需要消费的数据有部分无法访问(丢失或损坏),产出该部分数据的 Region 也需要重启。
+3. 需要重启的 Region 的下游 Region 也需要重启。这是出于保障数据一致性的考虑,因为一些非确定性的计算或者分发会导致同一个
+   Result Partition 每次产生时包含的数据都不相同。
+
 {% top %}
diff --git a/docs/redirects/restart_strategies.md b/docs/redirects/restart_strategies.md
new file mode 100644
index 0000000..7eef69e
--- /dev/null
+++ b/docs/redirects/restart_strategies.md
@@ -0,0 +1,24 @@
+---
+title: "Restart Strategies"
+layout: redirect
+redirect: /dev/task_failure_recovery.html
+permalink: /dev/restart_strategies.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 75883ba..e062829 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.text;
 
 /**
@@ -105,8 +106,16 @@ public class JobManagerOptions {
 
 	/**
 	 * This option specifies the failover strategy, i.e. how the job computation recovers from task failures.
+	 *
+	 * <p>The options "individual" and "region-legacy" are intentionally not included
+	 * as they have some known limitations or issues:
+	 * <ul>
+	 *     <li>"individual" strategy only works when all tasks are not connected, in which case the "region"
+	 * failover strategy would also restart failed tasks individually.
+	 *     <li>"region-legacy" strategy is not able to backtrack missing input result partitions.
+	 * </ul>
+	 * The new "region" strategy supersedes "individual" and "region-legacy" strategies and should always work.
 	 */
-	@Documentation.ExcludeFromDocumentation("The failover strategy feature is highly experimental.")
 	public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY =
 		key("jobmanager.execution.failover-strategy")
 			.defaultValue("full")
@@ -114,9 +123,12 @@ public class JobManagerOptions {
 				.text("This option specifies how the job computation recovers from task failures. " +
 					"Accepted values are:")
 				.list(
-					text("'full': Restarts all tasks."),
-					text("'individual': Restarts only the failed task. Should only be used if all tasks are independent components."),
-					text("'region': Restarts all tasks that could be affected by the task failure.")
+					text("'full': Restarts all tasks to recover the job."),
+					text("'region': Restarts all tasks that could be affected by the task failure. " +
+						"More details can be found %s.",
+						link(
+							"../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy",
+							"here"))
 				).build());
 
 	/**