You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/30 19:21:01 UTC

[5/7] flink git commit: [FLINK-3083] [docs] Add docs on how to configure streaming fault tolerance.

[FLINK-3083] [docs] Add docs on how to configure streaming fault tolerance.

This closes #1413


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf913476
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf913476
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf913476

Branch: refs/heads/master
Commit: cf913476965051d2ca38f3e95a84246bb7de712e
Parents: 2b358cd
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 26 16:45:45 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 30 17:44:12 2015 +0100

----------------------------------------------------------------------
 docs/_includes/navbar.html   |   3 +-
 docs/apis/fault_tolerance.md | 265 ++++++++++++++++++++++++++++++++++++++
 docs/apis/streaming_guide.md | 131 +------------------
 3 files changed, 268 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf913476/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index 62bdce8..c565feb 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -81,8 +81,9 @@ under the License.
                 <li><a href="{{ apis }}/python.html">Python API <span class="badge">Beta</span></a></li>
 
                 <li class="divider"></li>
-                <li><a href="{{ apis }}/scala_shell.html">Interactive Scala Shell</a></li>
+                <li><a href="{{ apis }}/fault_tolerance.html">Fault Tolerance</a></li>
                 <li><a href="{{ apis }}/state_backends.html">State in Streaming Programs</a></li>
+                <li><a href="{{ apis }}/scala_shell.html">Interactive Scala Shell</a></li>
                 <li><a href="{{ apis }}/dataset_transformations.html">DataSet Transformations</a></li>
                 <li><a href="{{ apis }}/best_practices.html">Best Practices</a></li>
                 <li><a href="{{ apis }}/example_connectors.html">Connectors (DataSet API)</a></li>

http://git-wip-us.apache.org/repos/asf/flink/blob/cf913476/docs/apis/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/fault_tolerance.md b/docs/apis/fault_tolerance.md
new file mode 100644
index 0000000..677ff95
--- /dev/null
+++ b/docs/apis/fault_tolerance.md
@@ -0,0 +1,265 @@
+---
+title: "Fault Tolerance"
+is_beta: false
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<a href="#top"></a>
+
+Flink's fault tolerance mechanism recovers programs in the presence of failures and
+continues to execute them. Such failures include machine hardware failures, network failures,
+transient program failures, etc.
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Streaming Fault Tolerance (DataStream API)
+------------------------------------------
+
+Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a *persistent* (or *durable*) source that
+can be asked for prior records again (Apache Kafka is a good example of such a source).
+
+The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see [Working with State]({{ site.baseurl }}/apis/streaming_guide.html#working-with-state)) consistently to provide *exactly once* processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured [state backend]({{ site.baseurl }}/apis/state_backends.html).
+
+The [docs on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
+
+To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
+
+Other parameters for checkpointing include:
+
+- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
+  When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
+
+- *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
+  Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
+
+- *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
+
+- *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete until then.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000);
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig().setCheckpointTimeout(60000);
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000)
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig.setCheckpointTimeout(60000)
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+{% endhighlight %}
+</div>
+</div>
+
+
+### Fault Tolerance Guarantees of Data Sources and Sinks
+
+Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the 
+snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but
+not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Source</th>
+      <th class="text-left" style="width: 25%">Guarantees</th>
+      <th class="text-left">Notes</th>
+    </tr>
+   </thead>
+   <tbody>
+        <tr>
+            <td>Apache Kafka</td>
+            <td>exactly once</td>
+            <td>Use the appropriate Kafka connector for your version</td>
+        </tr>
+        <tr>
+            <td>RabbitMQ</td>
+            <td>at most once (v 0.10) / exactly once (v 1.0) </td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>Twitter Streaming API</td>
+            <td>at most once</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>Collections</td>
+            <td>exactly once</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>Files</td>
+            <td>at least once</td>
+            <td>At failure the file will be read from the beginning</td>
+        </tr>
+        <tr>
+            <td>Sockets</td>
+            <td>at most once</td>
+            <td></td>
+        </tr>
+  </tbody>
+</table>
+
+To guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs
+to take part in the checkpointing mechanism. The following table lists the delivery guarantees (assuming exactly-once 
+state updates) of Flink coupled with bundled sinks:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Sink</th>
+      <th class="text-left" style="width: 25%">Guarantees</th>
+      <th class="text-left">Notes</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>HDFS rolling sink</td>
+        <td>exactly once</td>
+        <td>Implementation depends on Hadoop version</td>
+    </tr>
+    <tr>
+        <td>Elasticsearch</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+    <tr>
+        <td>Kafka producer</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+    <tr>
+        <td>File sinks</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+    <tr>
+        <td>Socket sinks</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+    <tr>
+        <td>Standard output</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+  </tbody>
+</table>
+
+[Back to top](#top)
+
+
+Batch Processing Fault Tolerance (DataSet API)
+----------------------------------------------
+
+Fault tolerance for programs in the *DataSet API* works by retrying failed executions.
+The number of time that Flink retries the execution before the job is declared as failed is configurable
+via the *execution retries* parameter. A value of *0* effectively means that fault tolerance is deactivated.
+
+To activate the fault tolerance, set the *execution retries* to a value larger than zero. A common choice is a value
+of three.
+
+This example shows how to configure the execution retries for a Flink DataSet program.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setNumberOfExecutionRetries(3);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setNumberOfExecutionRetries(3)
+{% endhighlight %}
+</div>
+</div>
+
+
+You can also define default values for the number of execution retries and the retry delay in the `flink-conf.yaml`:
+
+~~~
+execution-retries.default: 3
+~~~
+
+
+Retry Delays
+------------
+
+Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start
+immediately, but only after a certain delay.
+
+Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.
+
+You can set the retry delay for each program as follows (the sample shows the DataStream API - the DataSet API works similarly):
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
+{% endhighlight %}
+</div>
+</div>
+
+You can also define the default value for the retry delay in the `flink-conf.yaml`:
+
+~~~
+execution-retries.delay: 10 s
+~~~
+
+[Back to top](#top)
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/cf913476/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 3bb597b..366de22 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -2889,136 +2889,7 @@ Execution Parameters
 
 ### Fault Tolerance
 
-Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a *persistent* or *durable* source that
-can be asked for prior records again (Apache Kafka is a good example of a durable source).
-
-The checkpointing mechanism stores the progress in the source as well as the user-defined state (see [Working with State](#working_with_state))
-consistently to provide *exactly once* processing guarantees.
-
-To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
-
-Other parameters for checkpointing include:
-
-- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
-  When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
-- *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
-  Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
-
-The [docs on streaming fault tolerance](../internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
-
-Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the 
-snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but
-not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Source</th>
-      <th class="text-left" style="width: 25%">Guarantees</th>
-      <th class="text-left">Notes</th>
-    </tr>
-   </thead>
-   <tbody>
-        <tr>
-            <td>Apache Kafka</td>
-            <td>exactly once</td>
-            <td>Use the appropriate Kafka connector for your version</td>
-        </tr>
-        <tr>
-            <td>RabbitMQ</td>
-            <td>at most once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Twitter Streaming API</td>
-            <td>at most once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Collections</td>
-            <td>at most once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Files</td>
-            <td>at least once</td>
-            <td>At failure the file will be read from the beginning</td>
-        </tr>
-        <tr>
-            <td>Sockets</td>
-            <td>at most once</td>
-            <td></td>
-        </tr>
-  </tbody>
-</table>
-
-<!--
-| Source                | Strongest guarantees  | Notes |
-|-----------------------|-----------------------|-------|
-| Apache Kafka          | exactly once          | Use the appropriate Kafka connector |
-| RabbitMQ              | at most once          | |
-| Twitter Streaming API | at most once          | |
-| Collection sources    | at most once          | |
-| File sources          | at least once         | Restarts from beginning of the file |
-| Socket sources        | at most once          | |
--->
-
-To guarantee end-to-end exactly-once record delivery (in addition to exactly-once updates), the data sink needs
-to take part in the snapshotting mechanism. The following table lists the delivery guarantees (assuming exactly-once 
-state updates) of Flink coupled with bundled sinks:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Sink</th>
-      <th class="text-left" style="width: 25%">Guarantees</th>
-      <th class="text-left">Notes</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>HDFS rolling sink</td>
-        <td>exactly once</td>
-        <td>Implementation depends on Hadoop version</td>
-    </tr>
-    <tr>
-        <td>Elasticsearch</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Kafka producer</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>    
-    <tr>
-        <td>File sinks</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Socket sinks</td>
-        <td>at lest once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Standard output</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-  </tbody>
-</table>
-
-
-<!--
-| Sink                  | Strongest guarantees  | Notes |
-|-----------------------|-----------------------|-------|
-| HDFS rolling sink     | exactly once          | Implementation depends on Hadoop version |
-| Elasticsearch         | at least once         | Duplicates need to be handled in Elasticsearch
-| File sinks            | at least once         | |
-| Socket sinks          | at least once         | |
-| Standard output       | at least once         | |
--->
+The [Fault Tolerance Documentation]({{ site.baseurl }}/apis/fault_tolerance.html) describes the options and parameters to enable and configure Flink's checkpointing mechanism.
 
 ### Parallelism