You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/06 02:17:34 UTC

[flink] branch master updated: [FLINK-11633][docs-zh] Translate "Working with state" page into Chinese

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 906d593  [FLINK-11633][docs-zh] Translate "Working with state" page into Chinese
906d593 is described below

commit 906d593ae21c98e335e0a078fe37d6d295e6f993
Author: klion26 <qc...@gmail.com>
AuthorDate: Tue Apr 23 19:31:35 2019 +0800

    [FLINK-11633][docs-zh] Translate "Working with state" page into Chinese
    
    This closes #8341
---
 docs/dev/stream/state/state.md    |   4 +-
 docs/dev/stream/state/state.zh.md | 388 ++++++++++++++------------------------
 2 files changed, 142 insertions(+), 250 deletions(-)

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index be748d4..a128296 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -417,7 +417,7 @@ This feature can be activated in `StateTtlConfig`:
 import org.apache.flink.api.common.state.StateTtlConfig;
  StateTtlConfig ttlConfig = StateTtlConfig
     .newBuilder(Time.seconds(1))
-    .cleanupIncrementally()
+    .cleanupIncrementally(10, true)
     .build();
 {% endhighlight %}
 </div>
@@ -426,7 +426,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.StateTtlConfig
 val ttlConfig = StateTtlCon fig
     .newBuilder(Time.seconds(1))
-    .cleanupIncrementally
+    .cleanupIncrementally(10, true)
     .build
 {% endhighlight %}
 </div>
diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md
index 0ff75b0..37566ee 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
-## Keyed State and Operator State
+## Keyed State 与 Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`。
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`.
+*Keyed State* 通常和 key 相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of <parallel-operator-instance, key>, and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as <operator, key>.
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个 key 仅出现在一个分区内。
+逻辑上每个 keyed-state 和唯一元组 <算子并发实例, key> 绑定,由于每个 key 仅"属于"
+算子的一个并发,因此简化为 <算子, key>。
 
-Keyed State is further organized into so-called *Key Groups*. Key Groups are the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
+Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
+Key Group 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
 
 ### Operator State
 
-With *Operator State* (or *non-keyed state*), each operator state is
-bound to one parallel operator instance.
-The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State
-in Flink. Each parallel instance of the Kafka consumer maintains a map
-of topic partitions and offsets as its Operator State.
+对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
+[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。
+每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
+
+Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
 
-The Operator State interfaces support redistributing state among
-parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution.
+## Raw State 与 Managed State
 
-## Raw and Managed State
+*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*.
 
-*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
+*Managed State* 由 Flink 运行时控制的数据结构表示,比如内部的 hash table 或者 RocksDB。
+比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
 
-*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes
-the states and writes them into the checkpoints.
+*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。
 
-*Raw State* is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into
-the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes.
+所有 datastream 的 function 都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。
+由于 Flink 可以在修改并发时更好的分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。
 
-All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators.
-Using managed state (rather than raw state) is recommended, since with
-managed state Flink is able to automatically redistribute state when the parallelism is
-changed, and also do better memory management.
+<span class="label label-danger">注意</span> 如果你的 managed state 需要定制化的序列化逻辑,
+为了后续的兼容性请参考 [相应指南](custom_serialization.html),Flink 的默认序列化器不需要用户做特殊的处理。
 
-<span class="label label-danger">Attention</span> If your managed state needs custom serialization logic, please see 
-the [corresponding guide](custom_serialization.html) in order to ensure future compatibility. Flink's default serializers 
-don't need special treatment.
+## 使用 Managed Keyed State
 
-## Using Managed Keyed State
+managed keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 `KeyedStream`
+上使用,可以通过 `stream.keyBy(...)` 得到 `KeyedStream`.
 
-The managed keyed state interface provides access to different types of state that are all scoped to
-the key of the current input element. This means that this type of state can only be used
-on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
+接下来,我们会介绍不同类型的状态,然后介绍如何使用他们。所有支持的状态类型如下所示:
 
-Now, we will first look at the different types of state available and then we will see
-how they can be used in a program. The available state primitives are:
+* `ValueState<T>`: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。
+这个值可以通过 `update(T)` 进行更新,通过 `T value()` 进行检索。
 
-* `ValueState<T>`: This keeps a value that can be updated and
-retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value
-for each key that the operation sees). The value can be set using `update(T)` and retrieved using
-`T value()`.
 
-* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable`
-over all currently stored elements. Elements are added using `add(T)` or `addAll(List<T>)`, the Iterable can
-be retrieved using `Iterable<T> get()`. You can also override the existing list with `update(List<T>)`
+* `ListState<T>`: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过
+ `add(T)` 或者 `addAll(List<T>)` 进行添加元素,通过 `Iterable<T> get()` 获得整个列表。还可以通过 `update(List<T>)` 覆盖当前的列表。
 
-* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values
-added to the state. The interface is similar to `ListState` but elements added using
-`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+* `ReducingState<T>`: 保存一个单值,表示添加到状态的所有值的聚合。接口与 `ListState` 类似,但使用 `add(T)` 增加元素,会使用提供的 `ReduceFunction` 进行聚合。
 
-* `AggregatingState<IN, OUT>`: This keeps a single value that represents the aggregation of all values
-added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type
-of elements that are added to the state. The interface is the same as for `ListState` but elements
-added using `add(IN)` are aggregated using a specified `AggregateFunction`.
+* `AggregatingState<IN, OUT>`: 保留一个单值,表示添加到状态的所有值的聚合。和 `ReducingState` 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。
+接口与 `ListState` 类似,但使用 `add(IN)` 添加的元素会用指定的 `AggregateFunction` 进行聚合。
 
-* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values
-added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type
-of elements that are added to the state. The interface is similar to `ListState` but elements
-added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
+* `FoldingState<T, ACC>`: 保留一个单值,表示添加到状态的所有值的聚合。 与 `ReducingState` 相反,聚合类型可能与添加到状态的元素类型不同。 
+接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。
 
-* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
-retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
-`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
-views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.
+* `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
+ 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。
 
-All types of state also have a method `clear()` that clears the state for the currently
-active key, i.e. the key of the input element.
+所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
 
-<span class="label label-danger">Attention</span> `FoldingState` and `FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be completely removed in the future. Please use `AggregatingState` and `AggregatingStateDescriptor` instead.
+<span class="label label-danger">注意</span> `FoldingState` 和 `FoldingStateDescriptor` 从 Flink 1.4 开始就已经被启用,将会在未来被删除。
+作为替代请使用 `AggregatingState` 和 `AggregatingStateDescriptor`。
 
-It is important to keep in mind that these state objects are only used for interfacing
-with state. The state is not necessarily stored inside but might reside on disk or somewhere else.
-The second thing to keep in mind is that the value you get from the state
-depends on the key of the input element. So the value you get in one invocation of your
-user function can differ from the value in another invocation if the keys involved are different.
+请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。
+另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。
 
-To get a state handle, you have to create a `StateDescriptor`. This holds the name of the state
-(as we will see later, you can create several states, and they have to have unique names so
-that you can reference them), the type of the values that the state holds, and possibly
-a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
-want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`,
-a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`.
+你必须创建一个 `StateDescriptor`,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们),
+状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 根据不同的状态类型,可以创建`ValueStateDescriptor`,`ListStateDescriptor`,
+`ReducingStateDescriptor`,`FoldingStateDescriptor` 或 `MapStateDescriptor`。
 
-State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
-Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for
-information about that, but we will also see an example shortly. The `RuntimeContext` that
-is available in a `RichFunction` has these methods for accessing state:
+状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({{site.baseurl}}/zh/dev/api_concepts.html#rich-functions)获取相关信息,
+但是我们很快也会看到一个例子。`RichFunction` 中 `RuntimeContext` 提供如下方法:
 
 * `ValueState<T> getState(ValueStateDescriptor<T>)`
 * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
@@ -146,7 +111,7 @@ is available in a `RichFunction` has these methods for accessing state:
 * `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
 * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
 
-This is an example `FlatMapFunction` that shows how all of the parts fit together:
+下面是一个 `FlatMapFunction` 的例子,展示了如何将这些部分组合起来:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -260,23 +225,16 @@ object ExampleCountWindowAverage extends App {
 </div>
 </div>
 
-This example implements a poor man's counting window. We key the tuples by the first field
-(in the example all have the same key `1`). The function stores the count and a running sum in
-a `ValueState`. Once the count reaches 2 it will emit the average and clear the state so that
-we start over from `0`. Note that this would keep a different state value for each different input
-key if we had tuples with different values in the first field.
+这个例子实现了一个简单的计数窗口。 我们把元组的第一个元素当作 key(在示例中都 key 都是 "1")。 该函数将出现的次数以及总和存储在 "ValueState" 中。 
+一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。
 
-### State Time-To-Live (TTL)
+### 状态有效期 (TTL)
 
-A *time-to-live* (TTL) can be assigned to the keyed state of any type. If a TTL is configured and a
-state value has expired, the stored value will be cleaned up on a best effort basis which is
-discussed in more detail below.
+任何类型的 keyed state 都可以有 *有效期* (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值,这会在后面详述。
 
-All state collection types support per-entry TTLs. This means that list elements and map entries
-expire independently.
+所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。
 
-In order to use state TTL one must first build a `StateTtlConfig` configuration object. The TTL 
-functionality can then be enabled in any state descriptor by passing the configuration:
+在使用状态 TTL 前,需要先构建一个`StateTtlConfig` 配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -314,59 +272,45 @@ stateDescriptor.enableTimeToLive(ttlConfig)
 </div>
 </div>
 
-The configuration has several options to consider:
-
-The first parameter of the `newBuilder` method is mandatory, it is the time-to-live value.
+TTL 配置有以下几个选项:
+`newBuilder` 的第一个参数表示数据的有效期,是必选项。
 
-The update type configures when the state TTL is refreshed (by default `OnCreateAndWrite`):
+TTL 的更新策略(默认是 `OnCreateAndWrite`):
 
- - `StateTtlConfig.UpdateType.OnCreateAndWrite` - only on creation and write access
- - `StateTtlConfig.UpdateType.OnReadAndWrite` - also on read access
+ - `StateTtlConfig.UpdateType.OnCreateAndWrite` - 仅在创建和写入时更新
+ - `StateTtlConfig.UpdateType.OnReadAndWrite` - 读取时也更新
  
-The state visibility configures whether the expired value is returned on read access 
-if it is not cleaned up yet (by default `NeverReturnExpired`):
+数据在过期但还未被清理时的可见性配置如下(默认为 `NeverReturnExpired`):
 
- - `StateTtlConfig.StateVisibility.NeverReturnExpired` - expired value is never returned
- - `StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available
+ - `StateTtlConfig.StateVisibility.NeverReturnExpired` - 不返回过期数据
+ - `StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp` - 会返回过期但未清理的数据
  
-In case of `NeverReturnExpired`, the expired state behaves as if it does not exist anymore, 
-even if it still has to be removed. The option can be useful for use cases 
-where data has to become unavailable for read access strictly after TTL, 
-e.g. application working with privacy sensitive data.
- 
-Another option `ReturnExpiredIfNotCleanedUp` allows to return the expired state before its cleanup.
+`NeverReturnExpired` 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。
+`ReturnExpiredIfNotCleanedUp` 在数据被物理删除前都会返回。
 
-**Notes:** 
+**注意:** 
 
-- The state backends store the timestamp of the last modification along with the user value, 
-which means that enabling this feature increases consumption of state storage. 
-Heap state backend stores an additional Java object with a reference to the user state object 
-and a primitive long value in memory. The RocksDB state backend adds 8 bytes per stored value, list entry or map entry.
+- 状态上次的修改时间会和数据一起保存在 state backend 中,因此开启该特性会增加状态数据的存储。
+Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。 
 
-- Only TTLs in reference to *processing time* are currently supported.
+- 暂时只支持基于 *processing time* 的 TTL。
 
-- Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versa
-will lead to compatibility failure and `StateMigrationException`.
+- 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 "StateMigrationException"。
 
-- The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job.
+- TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
 
-- The map state with TTL currently supports null user values only if the user value serializer can handle null values. 
-If the serializer does not support null values, it can be wrapped with `NullableSerializer` at the cost of an extra byte in the serialized form.
+- 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持 null,
+可以用 `NullableSerializer` 包装一层。
 
-#### Cleanup of Expired State
+#### 过期数据的清理
 
-By default, expired values are only removed when they are read out explicitly,
-e.g. by calling `ValueState.value()`.
+默认情况下,仅在用户读取过期数据时才会删除过对应的状态,例如调用 `ValueState.value()`。
 
-<span class="label label-danger">Attention</span> This means that by default if expired state is not read,
-it won't be removed, possibly leading to ever growing state. This might change in future releases.
+<span class="label label-danger">注意</span> 默认情况下,如果不显示读取过期数据,则不会进行删除,可能导致状态持续增加。这种行为在未来可能会进行改变。
 
-##### Cleanup in full snapshot
+##### 全量快照时清理
 
-Additionally, you can activate the cleanup at the moment of taking the full state snapshot which
-will reduce its size. The local state is not cleaned up under the current implementation
-but it will not include the removed expired state in case of restoration from the previous snapshot.
-It can be configured in `StateTtlConfig`:
+此外,你可以在进行全量快照时进行清理,这将减少快照的大小。 当前实现中不会清除本地状态,但是在从先前快照恢复时则会移除过期数据。可以通过 `StateTtlConfig` 配置:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -394,22 +338,17 @@ val ttlConfig = StateTtlConfig
 </div>
 </div>
 
-This option is not applicable for the incremental checkpointing in the RocksDB state backend.
+这个配置对 RocksDB 增量 checkpoint 无效。
 
-**Notes:**
-- For existing jobs, this cleanup strategy can be activated or deactivated anytime in `StateTtlConfig`,
-e.g. after restart from savepoint.
+**注意:**
+- 这种清理方式可以在任何时候通过  `StateTtlConfig` 启用或者关闭,比如在从 savepoint 恢复时。
 
-##### Incremental cleanup
+##### 增量数据清理
 
-Another option is to trigger cleanup of some state entries incrementally.
-The trigger can be a callback from each state access or/and each record processing.
-If this cleanup strategy is active for certain state,
-The storage backend keeps a lazy global iterator for this state over all its entries.
-Every time incremental cleanup is triggered, the iterator is advanced.
-The traversed state entries are checked and expired ones are cleaned up.
+另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。
+每次触发增量清理时,从迭代器中选择已经过期的数进行清理。
 
-This feature can be activated in `StateTtlConfig`:
+该特性可以通过 `StateTtlConfig` 进行启用:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -417,7 +356,7 @@ This feature can be activated in `StateTtlConfig`:
 import org.apache.flink.api.common.state.StateTtlConfig;
  StateTtlConfig ttlConfig = StateTtlConfig
     .newBuilder(Time.seconds(1))
-    .cleanupIncrementally()
+    .cleanupIncrementally(10, true)
     .build();
 {% endhighlight %}
 </div>
@@ -426,37 +365,28 @@ import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.StateTtlConfig
 val ttlConfig = StateTtlCon fig
     .newBuilder(Time.seconds(1))
-    .cleanupIncrementally
+    .cleanupIncrementally(10, true)
     .build
 {% endhighlight %}
 </div>
 </div>
 
-This strategy has two parameters. The first one is number of checked state entries per each cleanup triggering.
-If enabled, it is always triggered per each state access.
-The second parameter defines whether to trigger cleanup additionally per each record processing.
+该策略有两个参数。 第一个是每次清理时检查状态的条目数。如果启用,则始终按每个状态访问触发。第二个参数表示是否在处理每条记录时触发清理。
 
-**Notes:**
-- If no access happens to the state or no records are processed, expired state will persist.
-- Time spent for the incremental cleanup increases record processing latency.
-- At the moment incremental cleanup is implemented only for Heap state backend. Setting it for RocksDB will have no effect.
-- If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys
-while iterating because of its specific implementation which does not support concurrent modifications.
-Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.
-- For existing jobs, this cleanup strategy can be activated or deactivated anytime in `StateTtlConfig`,
-e.g. after restart from savepoint.
+**注意:**
+- 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
+- 增量清理会增加数据处理的耗时。
+- 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
+- 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
+- 对已有的作业,这个清理方式可以在任何时候通过 `StateTtlConfig` 启用或禁用该特性,比如从 savepoint 重启后。
 
-##### Cleanup during RocksDB compaction
+##### 在 RocksDB 压缩时清理
 
-If RocksDB state backend is used, another cleanup strategy is to activate Flink specific compaction filter.
-RocksDB periodically runs asynchronous compactions to merge state updates and reduce storage.
-Flink compaction filter checks expiration timestamp of state entries with TTL
-and excludes expired values.
+如果使用 RocksDB state backend,还支持 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。
+Flink 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
 
-This feature is disabled by default. It has to be firstly activated for the RocksDB backend
-by setting Flink configuration option `state.backend.rocksdb.ttl.compaction.filter.enabled`
-or by calling `RocksDBStateBackend::enableTtlCompactionFilter` if a custom RocksDB state backend is created for a job.
-Then any state with TTL can be configured to use the filter:
+该特性默认是关闭的,可以通过 Flink 的配置项 `state.backend.rocksdb.ttl.compaction.filter.enabled` 或者调用 `RocksDBStateBackend::enableTtlCompactionFilter`
+启用该特性。然后通过如下方式让任何具有 TTL 配置的状态使用过滤器:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -482,35 +412,24 @@ val ttlConfig = StateTtlConfig
 </div>
 </div>
 
-RocksDB compaction filter will query current timestamp, used to check expiration, from Flink every time
-after processing certain number of state entries. This number is 1000 by default.
-You can optionally change it and pass a custom value to
-`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)` method.
-Updating the timestamp more often can improve cleanup speed
-but it decreases compaction performance because it uses JNI call from native code.
-
-You can activate debug logs from the native code of RocksDB filter
-by activating debug level for `FlinkCompactionFilter`:
+在 Flink 每处理固定数量(默认 1000)的状态后,RocksDB 压缩过滤器会使用当前时间戳进行是否过期的判断。可以通过
+`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)` 方法进行定制。
+频繁的更新时间戳可以加快过期状态的速度,但是会降低压缩的效率,因为压缩会有 JNI 调用。
 
+你还可以通过配置开启 RocksDB 过滤器的 debug 日志:
 `log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG`
 
-**Notes:**
-- Calling of TTL filter during compaction slows it down.
-The TTL filter has to parse timestamp of last access and check its expiration
-for every stored state entry per key which is being compacted.
-In case of collection state type (list or map) the check is also invoked per stored element.
-- If this feature is used with a list state which has elements with non-fixed byte length,
-the native TTL filter has to call additionally a Flink java type serializer of the element over JNI per each state entry
-where at least the first element has expired to determine the offset of the next unexpired element.
-- For existing jobs, this cleanup strategy can be activated or deactivated anytime in `StateTtlConfig`,
-e.g. after restart from savepoint.
+**注意:**
+- 压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。
+对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
+- 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器,
+从而确定下一个未过期数据的位置。
+- 对已有的作业,这个清理方式可以在任何时候通过 `StateTtlConfig` 启用或禁用该特性,比如从 savepoint 重启后。
 
-### State in the Scala DataStream API
+### DataStream 状态相关的 Scala API 
 
-In addition to the interface described above, the Scala API has shortcuts for stateful
-`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function
-gets the current value of the `ValueState` in an `Option` and must return an updated value that
-will be used to update the state.
+除了上面描述的接口之外,Scala API 还在 `KeyedStream` 上对 `map()` 和 `flatMap()` 访问 `ValueState` 提供了一个更便捷的接口。 
+用户函数能够通过 `Option` 获取当前 `ValueState` 的值,并且返回即将保存到状态的值。
 
 {% highlight scala %}
 val stream: DataStream[(String, Int)] = ...
@@ -524,15 +443,13 @@ val counts: DataStream[(String, Int)] = stream
     })
 {% endhighlight %}
 
-## Using Managed Operator State
+## 使用 Managed Operator State
 
-To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction`
-interface, or the `ListCheckpointed<T extends Serializable>` interface.
+用户可以通过实现 `CheckpointedFunction` 或 `ListCheckpointed<T extends Serializable>` 接口来使用 managed operator state。
 
 #### CheckpointedFunction
 
-The `CheckpointedFunction` interface provides access to non-keyed state with different
-redistribution schemes. It requires the implementation of two methods:
+`CheckpointedFunction` 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:
 
 {% highlight java %}
 void snapshotState(FunctionSnapshotContext context) throws Exception;
@@ -540,30 +457,18 @@ void snapshotState(FunctionSnapshotContext context) throws Exception;
 void initializeState(FunctionInitializationContext context) throws Exception;
 {% endhighlight %}
 
-Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`,
-is called every time the user-defined function is initialized, be that when the function is first initialized
-or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
-only the place where different types of state are initialized, but also where state recovery logic is included.
+进行 checkpoint 时会调用 `snapshotState()`。 用户自定义函数初始化时会调用 `initializeState()`,初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。
+因此 `initializeState()` 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。
 
-Currently, list-style managed operator state is supported. The state
-is expected to be a `List` of *serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
-non-keyed state can be redistributed. Depending on the state accessing method,
-the following redistribution schemes are defined:
+当前,managed operator state 以 list 的形式存在。这些状态是一个 *可序列化* 对象的集合 `List`,彼此独立,方便在改变并发后进行状态的重新分派。
+换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
 
-  - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of
-    all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators.
-    Each operator gets a sublist, which can be empty, or contain one or more elements.
-    As an example, if with parallelism 1 the checkpointed state of an operator
-    contains elements `element1` and `element2`, when increasing the parallelism to 2, `element1` may end up in operator instance 0,
-    while `element2` will go to operator instance 1.
+  - **Even-split redistribution:** 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。
+    比如说,算子 A 的并发读为 1,包含两个元素 `element1` 和 `element2`,当并发读增加为 2 时,`element1` 会被分到并发 0 上,`element2` 则会被分到并发 1 上。
 
-  - **Union redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of
-    all lists. On restore/redistribution, each operator gets the complete list of state elements.
+  - **Union redistribution:** 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。
 
-Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction`
-to buffer elements before sending them to the outside world. It demonstrates
-the basic even-split redistribution list state:
+下面的例子中的 `SinkFunction` 在 `CheckpointedFunction` 中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -669,13 +574,10 @@ class BufferingSink(threshold: Int = 0)
 </div>
 </div>
 
-The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize
-the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects
-are going to be stored upon checkpointing.
+`initializeState` 方法接收一个 `FunctionInitializationContext` 参数,会用来初始化 non-keyed state 的 "容器"。这些容器是一个 `ListState`
+用于在 checkpoint 时保存 non-keyed state 对象。
 
-Note how the state is initialized, similar to keyed state,
-with a `StateDescriptor` that contains the state name and information
-about the type of the value that the state holds:
+注意这些状态是如何初始化的,和 keyed state 类系,`StateDescriptor` 会包括状态名字、以及状态类型相关信息。
 
 
 <div class="codetabs" markdown="1">
@@ -703,27 +605,20 @@ checkpointedState = context.getOperatorStateStore.getListState(descriptor)
 {% endhighlight %}
 </div>
 </div>
-The naming convention of the state access methods contain its redistribution
-pattern followed by its state structure. For example, to use list state with the
-union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`.
-If the method name does not contain the redistribution pattern, *e.g.* `getListState(descriptor)`,
-it simply implies that the basic even-split redistribution scheme will be used.
 
-After initializing the container, we use the `isRestored()` method of the context to check if we are
-recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
+调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如 `getUnionListState(descriptor)` 会使用 union redistribution 算法,
+而 `getListState(descriptor)` 则简单的使用 even-split redistribution 算法。
+
+当初始化好状态对象后,我们通过 `isRestored()` 方法判断是否从之前的故障中恢复回来,如果该方法返回 `true` 则表示从故障中进行恢复,会执行接下来的恢复逻辑。
 
-As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state
-initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared
-of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
+正如代码所示,`BufferingSink` 中初始化时,恢复回来的 `ListState` 的所有元素会添加到一个局部变量中,供下次 `snapshotState()` 时使用。
+然后清空 `ListState`,再把当前局部变量中的所有元素写入到 checkpoint 中。
 
-As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done
-using the provided `FunctionInitializationContext`.
+另外,我们同样可以在 `initializeState()` 方法中使用 `FunctionInitializationContext` 初始化 keyed state。
 
 #### ListCheckpointed
 
-The `ListCheckpointed` interface is a more limited variant of `CheckpointedFunction`,
-which only supports list-style state with even-split redistribution scheme on restore.
-It also requires the implementation of two methods:
+`ListCheckpointed` 接口是 `CheckpointedFunction` 的精简版,仅支持 even-split redistributuion 的 list state。同样需要实现两个方法:
 
 {% highlight java %}
 List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
@@ -731,15 +626,12 @@ List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
 void restoreState(List<T> state) throws Exception;
 {% endhighlight %}
 
-On `snapshotState()` the operator should return a list of objects to checkpoint and
-`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
+`snapshotState()` 需要返回一个将写入到 checkpoint 的对象列表,`restoreState` 则需要处理恢复回来的对象列表。如果状态不可切分,
+则可以在 `snapshotState()` 中返回 `Collections.singletonList(MY_STATE)`。
 
-### Stateful Source Functions
+### 带状态的 Source Function
 
-Stateful sources require a bit more care as opposed to other operators.
-In order to make the updates to the state and output collection atomic (required for exactly-once semantics
-on failure/recovery), the user is required to get a lock from the source's context.
+带状态的数据源比其他的算子需要注意更多东西。为了保证更新状态以及输出的原子性(用于支持 exactly-once 语义),用户需要在发送数据前获取数据源的全局锁。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -825,6 +717,6 @@ class CounterSource
 </div>
 </div>
 
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
+希望订阅 checkpoint 成功消息的算子,可以参考 `org.apache.flink.runtime.state.CheckpointListener` 接口。
 
 {% top %}