You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 05:17:44 UTC

[flink] branch master updated: [FLINK-13227][docs-zh] Translate "asyncio" page into Chinese

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7517b98  [FLINK-13227][docs-zh] Translate "asyncio" page into Chinese
7517b98 is described below

commit 7517b98bac50d353ab0ab41134ca0579d9ec5b93
Author: YngwieWang <yn...@gmail.com>
AuthorDate: Tue Jun 11 14:27:35 2019 +0800

    [FLINK-13227][docs-zh] Translate "asyncio" page into Chinese
    
    This closes #9150
---
 docs/dev/stream/operators/asyncio.zh.md | 188 +++++++++++++-------------------
 1 file changed, 78 insertions(+), 110 deletions(-)

diff --git a/docs/dev/stream/operators/asyncio.zh.md b/docs/dev/stream/operators/asyncio.zh.md
index d51f5d0..6a6f707 100644
--- a/docs/dev/stream/operators/asyncio.zh.md
+++ b/docs/dev/stream/operators/asyncio.zh.md
@@ -26,71 +26,55 @@ under the License.
 * ToC
 {:toc}
 
-This page explains the use of Flink's API for asynchronous I/O with external data stores.
-For users not familiar with asynchronous or event-driven programming, an article about Futures and
-event-driven programming may be useful preparation.
+本文讲解 Flink 用于访问外部数据存储的异步 I/O API。
+对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。
 
-Note: Details about the design and implementation of the asynchronous I/O utility can be found in the proposal and design document
-[FLIP-12: Asynchronous I/O Design and Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673).
+提示:这篇文档 [FLIP-12: 异步 I/O 的设计和实现](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673)介绍了关于设计和实现异步 I/O 功能的细节。
 
+## 对于异步 I/O 操作的需求
 
-## The need for Asynchronous I/O Operations
+在与外部系统交互(用数据库中的数据扩充流数据)的时候,需要考虑与外部系统的通信延迟对整个流处理应用的影响。
 
-When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care
-that communication delay with the external system does not dominate the streaming application's total work.
+简单地访问外部数据库的数据,比如使用 `MapFunction`,通常意味着**同步**交互:
+`MapFunction` 向数据库发送一个请求然后一直等待,直到收到响应。在许多情况下,等待占据了函数运行的大部分时间。
 
-Naively accessing data in the external database, for example in a `MapFunction`, typically means **synchronous** interaction:
-A request is sent to the database and the `MapFunction` waits until the response has been received. In many cases, this waiting
-makes up the vast majority of the function's time.
-
-Asynchronous interaction with the database means that a single parallel function instance can handle many requests concurrently and
-receive the responses concurrently. That way, the waiting time can be overlayed with sending other requests and
-receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cased to much higher
-streaming throughput.
+与数据库异步交互是指一个并行函数实例可以并发地处理多个请求和接收多个响应。这样,函数在等待的时间可以发送其他请求和接收其他响应。至少等待的时间可以被多个请求摊分。大多数情况下,异步交互可以大幅度提高流处理的吞吐量。
 
 <img src="{{ site.baseurl }}/fig/async_io.svg" class="center" width="50%" />
 
-*Note:* Improving throughput by just scaling the `MapFunction` to a very high parallelism is in some cases possible as well, but usually
-comes at a very high resource cost: Having many more parallel MapFunction instances means more tasks, threads, Flink-internal network
-connections, network connections to the database, buffers, and general internal bookkeeping overhead.
+*注意:*仅仅提高 `MapFunction` 的并行度(parallelism)在有些情况下也可以提升吞吐量,但是这样做通常会导致非常高的资源消耗:更多的并行 `MapFunction` 实例意味着更多的 Task、更多的线程、更多的 Flink 内部网络连接、 更多的与数据库的网络连接、更多的缓冲和更多程序内部协调的开销。
 
 
-## Prerequisites
+## 先决条件
 
-As illustrated in the section above, implementing proper asynchronous I/O to a database (or key/value store) requires a client
-to that database that supports asynchronous requests. Many popular databases offer such a client.
+如上节所述,正确地实现数据库(或键/值存储)的异步 I/O 交互需要支持异步请求的数据库客户端。许多主流数据库都提供了这样的客户端。
 
-In the absence of such a client, one can try and turn a synchronous client into a limited concurrent client by creating
-multiple clients and handling the synchronous calls with a thread pool. However, this approach is usually less
-efficient than a proper asynchronous client.
+如果没有这样的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端。然而,这种方法通常比正规的异步客户端效率低。
 
 
-## Async I/O API
+## 异步 I/O API
 
-Flink's Async I/O API allows users to use asynchronous request clients with data streams. The API handles the integration with
-data streams, well as handling order, event time, fault tolerance, etc.
+Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端。API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。
 
-Assuming one has an asynchronous client for the target database, three parts are needed to implement a stream transformation
-with asynchronous I/O against the database:
+在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:
 
-  - An implementation of `AsyncFunction` that dispatches the requests
-  - A *callback* that takes the result of the operation and hands it to the `ResultFuture`
-  - Applying the async I/O operation on a DataStream as a transformation
+- 实现分发请求的 `AsyncFunction`
+- 获取数据库交互的结果并发送给 `ResultFuture` 的 *回调* 函数
+- 将异步 I/O 操作应用于 `DataStream` 作为 `DataStream` 的一次转换操作。
 
-The following code example illustrates the basic pattern:
+下面是基本的代码模板:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// This example implements the asynchronous request and callback with Futures that have the
-// interface of Java 8's futures (which is the same one followed by Flink's Future)
+// 这个例子使用 Java 8 的 Future 接口(与 Flink 的 Future 相同)实现了异步请求和回调。
 
 /**
- * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
+ * 实现 'AsyncFunction' 用于发送请求和设置回调。
  */
 class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
 
-    /** The database specific client that can issue concurrent requests with callbacks */
+    /** 能够利用回调函数并发发送请求的数据库客户端 */
     private transient DatabaseClient client;
 
     @Override
@@ -106,11 +90,11 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
     @Override
     public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
 
-        // issue the asynchronous request, receive a future for result
+        // 发送异步请求,接收 future 结果
         final Future<String> result = client.query(key);
 
-        // set the callback to be executed once the request by the client is complete
-        // the callback simply forwards the result to the result future
+        // 设置客户端完成请求后要执行的回调函数
+        // 回调函数只是简单地把结果发给 future
         CompletableFuture.supplyAsync(new Supplier<String>() {
 
             @Override
@@ -118,7 +102,7 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
                 try {
                     return result.get();
                 } catch (InterruptedException | ExecutionException e) {
-                    // Normally handled explicitly.
+                    // 显示地处理异常。
                     return null;
                 }
             }
@@ -128,10 +112,10 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
     }
 }
 
-// create the original stream
+// 创建初始 DataStream
 DataStream<String> stream = ...;
 
-// apply the async I/O transformation
+// 应用异步 I/O 转换操作
 DataStream<Tuple2<String, String>> resultStream =
     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
 
@@ -140,34 +124,34 @@ DataStream<Tuple2<String, String>> resultStream =
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 /**
- * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
+ * 实现 'AsyncFunction' 用于发送请求和设置回调。
  */
 class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
 
-    /** The database specific client that can issue concurrent requests with callbacks */
+    /** 能够利用回调函数并发发送请求的数据库客户端 */
     lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
 
-    /** The context used for the future callbacks */
+    /** 用于 future 回调的上下文环境 */
     implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
 
 
     override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
 
-        // issue the asynchronous request, receive a future for the result
+        // 发送异步请求,接收 future 结果
         val resultFutureRequested: Future[String] = client.query(str)
 
-        // set the callback to be executed once the request by the client is complete
-        // the callback simply forwards the result to the result future
+        // 设置客户端完成请求后要执行的回调函数
+        // 回调函数只是简单地把结果发给 future
         resultFutureRequested.onSuccess {
             case result: String => resultFuture.complete(Iterable((str, result)))
         }
     }
 }
 
-// create the original stream
+// 创建初始 DataStream
 val stream: DataStream[String] = ...
 
-// apply the async I/O transformation
+// 应用异步 I/O 转换操作
 val resultStream: DataStream[(String, String)] =
     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)
 
@@ -175,95 +159,79 @@ val resultStream: DataStream[(String, String)] =
 </div>
 </div>
 
-**Important note**: The `ResultFuture` is completed with the first call of `ResultFuture.complete`.
-All subsequent `complete` calls will be ignored.
+**重要提示**: 第一次调用 `ResultFuture.complete` 后 `ResultFuture` 就完成了。
+后续的 `complete` 调用都将被忽略。
 
-The following two parameters control the asynchronous operations:
+下面两个参数控制异步操作:
 
-  - **Timeout**: The timeout defines how long an asynchronous request may take before it is considered failed. This parameter
-    guards against dead/failed requests.
+  - **Timeout**: 超时参数定义了异步请求发出多久后未得到响应即被认定为失败。 它可以防止一直等待得不到响应的请求。
 
-  - **Capacity**: This parameter defines how many asynchronous requests may be in progress at the same time.
-    Even though the async I/O approach leads typically to much better throughput, the operator can still be the bottleneck in
-    the streaming application. Limiting the number of concurrent requests ensures that the operator will not
-    accumulate an ever-growing backlog of pending requests, but that it will trigger backpressure once the capacity
-    is exhausted.
+  - **Capacity**: 容量参数定义了可以同时进行的异步请求数。
+    即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈。 限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压。
 
 
-### Timeout Handling
+### 超时处理
 
-When an async I/O request times out, by default an exception is thrown and job is restarted.
-If you want to handle timeouts, you can override the `AsyncFunction#timeout` method.
+当异步 I/O 请求超时的时候,默认会抛出异常并重启作业。
+如果你想处理超时,可以重写 `AsyncFunction#timeout` 方法。
 
+### 结果的顺序
 
-### Order of Results
+`AsyncFunction` 发出的并发请求经常以不确定的顺序完成,这取决于请求得到响应的顺序。
+Flink 提供两种模式控制结果记录以何种顺序发出。
 
-The concurrent requests issued by the `AsyncFunction` frequently complete in some undefined order, based on which request finished first.
-To control in which order the resulting records are emitted, Flink offers two modes:
+  - **无序模式**: 异步请求一结束就立刻发出结果记录。
+    流中记录的顺序在经过异步 I/O 算子之后发生了改变。
+    当使用 *处理时间* 作为基本时间特征时,这个模式具有最低的延迟和最少的开销。
+    此模式使用 `AsyncDataStream.unorderedWait(...)` 方法。
 
-  - **Unordered**: Result records are emitted as soon as the asynchronous request finishes.
-    The order of the records in the stream is different after the async I/O operator than before.
-    This mode has the lowest latency and lowest overhead, when used with *processing time* as the basic time characteristic.
-    Use `AsyncDataStream.unorderedWait(...)` for this mode.
+  - **有序模式**: 这种模式保持了流的顺序。发出结果记录的顺序与触发异步请求的顺序(记录输入算子的顺序)相同。为了实现这一点,算子将缓冲一个结果记录直到这条记录前面的所有记录都发出(或超时)。由于记录或者结果要在 checkpoint 的状态中保存更长的时间,所以与无序模式相比,有序模式通常会带来一些额外的延迟和 checkpoint 开销。此模式使用 `AsyncDataStream.orderedWait(...)` 方法。
 
-  - **Ordered**: In that case, the stream order is preserved. Result records are emitted in the same order as the asynchronous
-    requests are triggered (the order of the operators input records). To achieve that, the operator buffers a result record
-    until all its preceding records are emitted (or timed out).
-    This usually introduces some amount of extra latency and some overhead in checkpointing, because records or results are maintained
-    in the checkpointed state for a longer time, compared to the unordered mode.
-    Use `AsyncDataStream.orderedWait(...)` for this mode.
 
+### 事件时间
 
-### Event Time
+当流处理应用使用[事件时间]({{ site.baseurl }}/zh/dev/event_time.html)时,异步 I/O 算子会正确处理 watermark。对于两种顺序模式,这意味着以下内容:
 
-When the streaming application works with [event time]({{ site.baseurl }}/dev/event_time.html), watermarks will be handled correctly by the
-asynchronous I/O operator. That means concretely the following for the two order modes:
+  - **无序模式**: Watermark 既不超前于记录也不落后于记录,即 watermark 建立了*顺序的边界*。
+    只有连续两个 watermark 之间的记录是无序发出的。
+    在一个 watermark 后面生成的记录只会在这个 watermark 发出以后才发出。
+    在一个 watermark 之前的所有输入的结果记录全部发出以后,才会发出这个 watermark。
 
-  - **Unordered**: Watermarks do not overtake records and vice versa, meaning watermarks establish an *order boundary*.
-    Records are emitted unordered only between watermarks.
-    A record occurring after a certain watermark will be emitted only after that watermark was emitted.
-    The watermark in turn will be emitted only after all result records from inputs before that watermark were emitted.
+    这意味着存在 watermark 的情况下,*无序模式* 会引入一些与*有序模式* 相同的延迟和管理开销。开销大小取决于 watermark 的频率。
 
-    That means that in the presence of watermarks, the *unordered* mode introduces some of the same latency and management
-    overhead as the *ordered* mode does. The amount of that overhead depends on the watermark frequency.
+  - **有序模式**: 连续两个 watermark 之间的记录顺序也被保留了。开销与使用*处理时间* 相比,没有显著的差别。
+    
 
-  - **Ordered**: Order of watermarks an records is preserved, just like order between records is preserved. There is no
-    significant change in overhead, compared to working with *processing time*.
+请记住,*摄入时间* 是一种特殊的*事件时间*,它基于数据源的处理时间自动生成 watermark。
 
-Please recall that *Ingestion Time* is a special case of *event time* with automatically generated watermarks that
-are based on the sources processing time.
 
+### 容错保证
 
-### Fault Tolerance Guarantees
+异步 I/O 算子提供了完全的精确一次容错保证。它将在途的异步请求的记录保存在 checkpoint 中,在故障恢复时重新触发请求。
 
-The asynchronous I/O operator offers full exactly-once fault tolerance guarantees. It stores the records for in-flight
-asynchronous requests in checkpoints and restores/re-triggers the requests when recovering from a failure.
 
+### 实现提示
 
-### Implementation Tips
+在实现使用 *Executor*(或者 Scala 中的 *ExecutionContext*)和回调的 *Futures* 时,建议使用 `DirectExecutor`,因为通常回调的工作量很小,`DirectExecutor` 避免了额外的线程切换开销。回调通常只是把结果发送给 `ResultFuture`,也就是把它添加进输出缓冲。从这里开始,包括发送记录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。
 
-For implementations with *Futures* that have an *Executor* (or *ExecutionContext* in Scala) for callbacks, we suggests to use a `DirectExecutor`, because the
-callback typically does minimal work, and a `DirectExecutor` avoids an additional thread-to-thread handover overhead. The callback typically only hands
-the result to the `ResultFuture`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
-with the checkpoint bookkeeping happens in a dedicated thread-pool anyways.
+`DirectExecutor` 可以通过 `org.apache.flink.runtime.concurrent.Executors.directExecutor()` 或
+`com.google.common.util.concurrent.MoreExecutors.directExecutor()` 获得。
 
-A `DirectExecutor` can be obtained via `org.apache.flink.runtime.concurrent.Executors.directExecutor()` or
-`com.google.common.util.concurrent.MoreExecutors.directExecutor()`.
 
+### 警告
 
-### Caveat
+**Flink 不以多线程方式调用 AsyncFunction**
 
-**The AsyncFunction is not called Multi-Threaded**
+我们想在这里明确指出一个经常混淆的地方:`AsyncFunction` 不是以多线程方式调用的。
+只有一个 `AsyncFunction` 实例,它被流中相应分区内的每个记录顺序地调用。除非 `asyncInvoke(...)` 方法快速返回并且依赖于(客户端的)回调, 否则无法实现正确的异步 I/O。
 
-A common confusion that we want to explicitly point out here is that the `AsyncFunction` is not called in a multi-threaded fashion.
-There exists only one instance of the `AsyncFunction` and it is called sequentially for each record in the respective partition
-of the stream. Unless the `asyncInvoke(...)` method returns fast and relies on a callback (by the client), it will not result in
-proper asynchronous I/O.
+例如,以下情况导致阻塞的 `asyncInvoke(...)` 函数,从而使异步行为无效:
 
-For example, the following patterns result in a blocking `asyncInvoke(...)` functions and thus void the asynchronous behavior:
+  - 使用同步数据库客户端,它的查询方法调用在返回结果前一直被阻塞。
+  - 在 `asyncInvoke(...)` 方法内阻塞等待异步客户端返回的 future 类型对象
 
-  - Using a database client whose lookup/query method call blocks until the result has been received back
+**目前,出于一致性的原因,AsyncFunction 的算子(异步等待算子)必须位于算子链的头部**
 
-  - Blocking/waiting on the future-type objects returned by an asynchronous client inside the `asyncInvoke(...)` method
+根据 `FLINK-13063` 给出的原因,目前我们必须断开异步等待算子的算子链以防止潜在的一致性问题。这改变了先前支持的算子链的行为。需要旧有行为并接受可能违反一致性保证的用户可以实例化并手工将异步等待算子添加到作业图中并将链策略设置回通过异步等待算子的 `ChainingStrategy.ALWAYS` 方法进行链接。
 
 {% top %}