You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/07/30 05:39:41 UTC

[flink] branch master updated: [FLINK-17886][docs-zh] Update Chinese documentation for new WatermarkGenerator/WatermarkStrategies

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 239372a  [FLINK-17886][docs-zh] Update Chinese documentation for new WatermarkGenerator/WatermarkStrategies
239372a is described below

commit 239372a5d085fa1ee075c368f3e438383ad5d6e6
Author: Yichao Yang <10...@qq.com>
AuthorDate: Tue Jun 16 11:33:18 2020 +0800

    [FLINK-17886][docs-zh] Update Chinese documentation for new WatermarkGenerator/WatermarkStrategies
    
    This closes #12665
---
 docs/dev/event_time.zh.md                  |  21 +-
 docs/dev/event_timestamp_extractors.zh.md  |  70 ++---
 docs/dev/event_timestamps_watermarks.zh.md | 393 +++++++++++++++--------------
 3 files changed, 234 insertions(+), 250 deletions(-)

diff --git a/docs/dev/event_time.zh.md b/docs/dev/event_time.zh.md
index 104713e..478cc6e 100644
--- a/docs/dev/event_time.zh.md
+++ b/docs/dev/event_time.zh.md
@@ -24,13 +24,13 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-本节你将学到如何写可感知时间变化的 Flink 程序,可以先看看[实时流处理]({% link concepts/timely-stream-processing.zh.md %})了解相关概念。
+在本节中,你将学习编写可感知时间变化(time-aware)的 Flink 程序。可以参阅[实时流处理]({% link concepts/timely-stream-processing.zh.md %})小节以了解实时流处理的概念。
 
-想了解如何在 Flink 程序中使用时间特性,请参阅[窗口]({% link dev/stream/operators/windows.zh.md %})和[处理函数]({% link dev/stream/operators/process_function.zh.md %})。
+有关如何在 Flink 程序中使用时间特性,请参阅[窗口]({% link dev/stream/operators/windows.zh.md %})和 [ProcessFunction]({% link dev/stream/operators/process_function.zh.md %}) 小节。
 
-使用*事件时间*进行流处理的先决条件是设置合适的*时间特性*,该设置定义了数据流的行为方式(例如,是否分配时间戳),以及窗口算子使用哪种时间概念,例如 `KeyedStream.timeWindow(Time.seconds(30))` 。
+使用*事件时间*处理数据之前需要在程序中设置正确的*时间语义*。此项设置会定义源数据的处理方式(例如:程序是否会对数据分配时间戳),以及程序应使用什么时间语义执行 `KeyedStream.timeWindow(Time.seconds(30))` 之类的窗口操作。
 
-你可以调用 `StreamExecutionEnvironment.setStreamTimeCharacteristic()` 设置时间特性:
+可以通过 `StreamExecutionEnvironment.setStreamTimeCharacteristic()` 设置程序的时间语义,示例如下:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -69,19 +69,16 @@ env = StreamExecutionEnvironment.get_execution_environment()
 
 env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
 
-# alternatively:
-# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
-# env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
 {% endhighlight %}
 </div>
 </div>
 
-值得注意的是,为了能够使用*事件时间*作为时间特征运行此示例,程序需要使用那些能给数据直接定义事件时间并自己发出水印的源,或者程序必须在收到源发出的事件流之后注入“时间戳分配器和水印生成器”,这些功能描述了访问事件时间戳的方法,以及事件流呈现的乱序程度。
+注意:为了以*事件时间*的语义运行上述示例,程序需要满足下列其中一种条件,要么其消费的数据源直接为其数据定义了事件时间并且可以发出 watermark,要么程序必须在数据源之后显示声明*时间戳分配器和 Watermark 生成器*(*Timestamp Assigner&Watermark Generator*)。这些函数可以定义 Flink 程序如何获取到事件时间戳以及定义事件流的乱序程度。
 
-## 接下来看什么?
+## 接下来学习的内容?
 
-* [生成水印]({% link dev/event_timestamps_watermarks.zh.md %}):描述了在写可感知事件时间的 Flink 应用程序时,如何定义时间戳分配器和水印生成器。
-* [内置水印生成器]({% link dev/event_timestamp_extractors.zh.md %}):概述了 Flink 自带的水印生成器。
-* [调试窗口&事件时间]({% link monitoring/debugging_event_time.zh.md %}):描述了在可感知事件时间的 Flink 应用程序里,如何调试水印和时间戳。
+* [生成 Watermark]({% link dev/event_timestamps_watermarks.zh.md %}):展示如何编写 Flink 应用程序感知事件时间所必需的时间戳分配器和 watermark 生成器。
+* [内置 Watermark 生成器]({% link dev/event_timestamp_extractors.zh.md %}):概述了 Flink 框架内置的 watermark 生成器。
+* [调试窗口和事件时间]({% link monitoring/debugging_event_time.zh.md %}):展示如何在含有事件时间语义的 Flink 应用程序中调试 watermark 和时间戳相关的问题。
 
 {% top %}
diff --git a/docs/dev/event_timestamp_extractors.zh.md b/docs/dev/event_timestamp_extractors.zh.md
index 01b3634..9474bba 100644
--- a/docs/dev/event_timestamp_extractors.zh.md
+++ b/docs/dev/event_timestamp_extractors.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "Pre-defined Timestamp Extractors / Watermark Emitters"
+title: "内置 Watermark 生成器"
 nav-parent_id: event_time
 nav-pos: 2
 ---
@@ -25,83 +25,47 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the stream.
+如[生成 Watermark]({%link dev/event_timestamps_watermarks.zh.md %}) 小节中所述,Flink 提供的抽象方法可以允许用户自己去定义时间戳分配方式和 watermark 生成的方式。你可以通过实现 `WatermarkGenerator` 接口来实现上述功能。
 
-In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.
-This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example
-for custom implementations.
+为了进一步简化此类任务的编程工作,Flink 框架预设了一些时间戳分配器。本节后续内容有举例。除了开箱即用的已有实现外,其还可以作为自定义实现的示例以供参考。
 
-### **Assigners with ascending timestamps**
+<a name="monotonously-increasing-timestamps"></a>
 
-The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task
-occur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps will
-arrive.
+## 单调递增时间戳分配器
 
-Note that it is only necessary that timestamps are ascending *per parallel data source task*. For example, if
-in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that
-timestamps are ascending within each Kafka partition. Flink's watermark merging mechanism will generate correct
-watermarks whenever parallel streams are shuffled, unioned, connected, or merged.
+*周期性* watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。
+
+注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个*单分区数据源任务*时间戳递增。例如,设置每一个并行数据源实例都只读取一个 Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可。Flink 的 watermark 合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks =
-    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
-
-        @Override
-        public long extractAscendingTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
+WatermarkStrategy.forMonotonousTimestamps();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
+WatermarkStrategy.forMonotonousTimestamps()
 {% endhighlight %}
 </div>
 </div>
 
-### **Assigners allowing a fixed amount of lateness**
+<a name="fixed-amount-of-lateness"></a>
 
-Another example of periodic watermark generation is when the watermark lags behind the maximum (event-time) timestamp
-seen in the stream by a fixed amount of time. This case covers scenarios where the maximum lateness that can be encountered in a
-stream is known in advance, e.g. when creating a custom source containing elements with timestamps spread within a fixed period of
-time for testing. For these cases, Flink provides the `BoundedOutOfOrdernessTimestampExtractor` which takes as an argument
-the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed to be late before being ignored when computing the
-final result for the given window. Lateness corresponds to the result of `t - t_w`, where `t` is the (event-time) timestamp of an
-element, and `t_w` that of the previous watermark. If `lateness > 0` then the element is considered late and is, by default, ignored when computing
-the result of the job for its corresponding window. See the documentation about [allowed lateness]({{ site.baseurl }}/dev/stream/operators/windows.html#allowed-lateness)
-for more information about working with late elements.
+## 数据之间存在最大固定延迟的时间戳分配器
+
+另一个周期性 watermark 生成的典型例子是,watermark 滞后于数据流中最大(事件时间)时间戳一个固定的时间量。该示例可以覆盖的场景是你预先知道数据流中的数据可能遇到的最大延迟,例如,在测试场景下创建了一个自定义数据源,并且这个数据源的产生的数据的时间戳在一个固定范围之内。Flink 针对上述场景提供了 `boundedOutfordernessWatermarks` 生成器,该生成器将 `maxOutOfOrderness` 作为参数,该参数代表在计算给定窗口的结果时,允许元素被忽略计算之前延迟到达的最长时间。其中延迟时长就等于 `t_w - t` ,其中 `t` 代表元素的(事件时间)时间戳,`t_w` 代表前一个 watermark 对应的(事件时间)时间戳。如果 `lateness > 0`,则认为该元素迟到了,并且在计算相应窗口的结果时默认会被忽略。有关使用延迟元素的详细内容,请参阅有关[允许延迟]({% link d
 ev/stream/operators/windows.md %}#allowed-lateness)的文档。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks =
-    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
-
-        @Override
-        public long extractTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
+WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
+WatermarkStrategy
+  .forBoundedOutOfOrderness(Duration.ofSeconds(10))
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/event_timestamps_watermarks.zh.md b/docs/dev/event_timestamps_watermarks.zh.md
index 28e1f2f..cc68b1d 100644
--- a/docs/dev/event_timestamps_watermarks.zh.md
+++ b/docs/dev/event_timestamps_watermarks.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "Generating Timestamps / Watermarks"
+title: "生成 Watermark"
 nav-parent_id: event_time
 nav-pos: 1
 ---
@@ -22,115 +22,78 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+在本节中,你将了解 Flink 中用于处理**事件时间**的时间戳和 watermark 相关的 API。有关*事件时间*,*处理时间*和*摄取时间*的介绍,请参阅[事件时间概览]({% link dev/event_time.zh.md %})小节。
+
 * toc
 {:toc}
 
+<a name="introduction-to-watermark-strategies"></a>
 
-This section is relevant for programs running on **event time**. For an introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' *timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by accessing/extracting the
-timestamp from some field in the element.
-
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
-progress in event time.
+## Watermark 策略简介
 
-There are two ways to assign timestamps and generate watermarks:
+为了使用*事件时间*语义,Flink 应用程序需要知道事件*时间戳*对应的字段,意味着数据流中的每个元素都需要拥有*可分配*的事件时间戳。其通常通过使用 `TimestampAssigner` API 从元素中的某个字段去访问/提取时间戳。
 
-  1. Directly in the data stream source
-  2. Via a timestamp assigner / watermark generator: in Flink, timestamp assigners also define the watermarks to be emitted
+时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 `WatermarkGenerator` 来配置 watermark 的生成方式。
 
-<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
-milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
+使用 Flink API 时需要设置一个同时包含 `TimestampAssigner` 和 `WatermarkGenerator` 的 `WatermarkStrategy`。`WatermarkStrategy` 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:
 
-### Source Functions with Timestamps and Watermarks
+{% highlight java %}
+public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
 
-Stream sources can directly assign timestamps to the elements they produce, and they can also emit watermarks.
-When this is done, no timestamp assigner is needed.
-Note that if a timestamp assigner is used, any timestamps and watermarks provided by the source will be overwritten.
+    /**
+     * 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
+     */
+    @Override
+    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
 
-To assign a timestamp to an element in the source directly, the source must use the `collectWithTimestamp(...)`
-method on the `SourceContext`. To generate watermarks, the source must call the `emitWatermark(Watermark)` function.
+    /**
+     * 根据策略实例化一个 watermark 生成器。
+     */
+    @Override
+    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+}
+{% endhighlight %}
 
-Below is a simple example of a *(non-checkpointed)* source that assigns timestamps and generates watermarks:
+如上所述,通常情况下,你不用实现此接口,而是可以使用 `WatermarkStrategy` 工具类中通用的 watermark 策略,或者可以使用这个工具类将自定义的 `TimestampAssigner` 与 `WatermarkGenerator` 进行绑定。例如,你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-@Override
-public void run(SourceContext<MyType> ctx) throws Exception {
-	while (/* condition */) {
-		MyType next = getNext();
-		ctx.collectWithTimestamp(next, next.getEventTimestamp());
-
-		if (next.hasWatermarkTime()) {
-			ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
-		}
-	}
-}
+WatermarkStrategy
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withTimestampAssigner((event, timestamp) -> event.f0);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-override def run(ctx: SourceContext[MyType]): Unit = {
-	while (/* condition */) {
-		val next: MyType = getNext()
-		ctx.collectWithTimestamp(next, next.eventTimestamp)
-
-		if (next.hasWatermarkTime) {
-			ctx.emitWatermark(new Watermark(next.getWatermarkTime))
-		}
-	}
-}
+WatermarkStrategy
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
+    override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
+  })
 {% endhighlight %}
+
+(Using Scala Lambdas here currently doesn't work because Scala is stupid and it's hard to support this. #fus)
 </div>
 </div>
 
+其中 `TimestampAssigner` 的设置与否是可选的,大多数情况下,可以不用去特别指定。例如,当使用 Kafka 或 Kinesis 数据源时,你可以直接从 Kafka/Kinesis 数据源记录中获取到时间戳。
+
+稍后我们将在[自定义 WatermarkGenerator](#writing-watermarkgenerators) 小节学习 WatermarkGenerator 接口。
+
+<div class="alert alert-warning">
+<strong>注意</strong>:时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
+</div>
 
-### Timestamp Assigners / Watermark Generators
+<a name="using-watermark-strategies"></a>
 
-Timestamp assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
-original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
+## 使用 Watermark 策略
 
-Timestamp assigners are usually specified immediately after the data source, but it is not strictly required to do so.
-A common pattern, for example, is to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
-In any case, the timestamp assigner needs to be specified before the first operation on event time
-(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
-Flink allows the specification of a timestamp assigner / watermark emitter inside
-the source (or consumer) itself. More information on how to do so can be found in the
-[Kafka Connector documentation]({{ site.baseurl }}/dev/connectors/kafka.html).
+`WatermarkStrategy` 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用。
 
+第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 `WatermarkStrategy` 意味着你必须使用特定数据源接口,参阅 [Watermark 策略与 Kafka 连接器](#watermark-策略与-kafka-连接器)以了解如何使用 Kafka Connector,以及有关每个分区的 watermark 是如何生成以及工作的。
 
-**NOTE:** The remainder of this section presents the main interfaces a programmer has
-to implement in order to create her own timestamp extractors/watermark emitters.
-To see the pre-implemented extractors that ship with Flink, please refer to the
-[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/dev/event_timestamp_extractors.html) page.
+仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 `WatermarkStrategy`):
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -144,7 +107,7 @@ DataStream<MyEvent> stream = env.readFile(
 
 DataStream<MyEvent> withTimestampsAndWatermarks = stream
         .filter( event -> event.severity() == WARNING )
-        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
+        .assignTimestampsAndWatermarks(<watermark strategy>);
 
 withTimestampsAndWatermarks
         .keyBy( (event) -> event.getGroup() )
@@ -164,7 +127,7 @@ val stream: DataStream[MyEvent] = env.readFile(
 
 val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
         .filter( _.severity == WARNING )
-        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
+        .assignTimestampsAndWatermarks(<watermark strategy>)
 
 withTimestampsAndWatermarks
         .keyBy( _.getGroup )
@@ -175,140 +138,192 @@ withTimestampsAndWatermarks
 </div>
 </div>
 
+使用 `WatermarkStrategy` 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。
 
-#### **With Periodic Watermarks**
+<a name="dealing-with-idle-sources"></a>
 
-`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
-on the stream elements, or purely based on processing time).
+## 处理空闲数据源
 
-The interval (every *n* milliseconds) in which the watermark will be generated is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner's `getCurrentWatermark()` method will be
-called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous
-watermark.
+如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 `WatermarkGenerator` 也不会获得任何新数据去生成 watermark。我们称这类数据源为*空闲输入*或*空闲源*。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。
 
-Here we show two simple examples of timestamp assigners that use periodic watermark generation. Note that Flink ships with a `BoundedOutOfOrdernessTimestampExtractor` similar to the `BoundedOutOfOrdernessGenerator` shown below, which you can read about [here]({{ site.baseurl }}/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness).
+为了解决这个问题,你可以使用 `WatermarkStrategy` 来检测空闲输入并将其标记为空闲状态。`WatermarkStrategy` 为此提供了一个工具接口:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+WatermarkStrategy
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withIdleness(Duration.ofMinutes(1));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+WatermarkStrategy
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withIdleness(Duration.ofMinutes(1))
+{% endhighlight %}
+</div>
+</div>
+
+
+<a name="writing-watermarkgenerators"></a>
+
+## 自定义 WatermarkGenerator
+
+`TimestampAssigner` 是一个可以从事件数据中提取时间戳字段的简单函数,我们无需详细查看其实现。但是 `WatermarkGenerator` 的编写相对就要复杂一些了,我们将在接下来的两小节中介绍如何实现此接口。WatermarkGenerator 接口代码如下:
+
+{% highlight java %}
+/**
+ * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
+ *
+ * <p><b>注意:</b>  WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} 
+ * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
+ */
+@Public
+public interface WatermarkGenerator<T> {
+
+    /**
+     * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。
+     */
+    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
+
+    /**
+     * 周期性的调用,也许会生成新的 watermark,也许不会。
+     *
+     * <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
+     */
+    void onPeriodicEmit(WatermarkOutput output);
+}
+{% endhighlight %}
+
+watermark 的生成方式本质上是有两种:*周期性生成*和*标记生成*。
+
+周期性生成器通常通过 `onEvent()` 观察传入的事件数据,然后在框架调用 `onPeriodicEmit()` 时发出 watermark。
+
+标记生成器将查看 `onEvent()` 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 `onPeriodicEmit()` 发出 watermark。
+
+接下来,我们将学习如何实现上述两类生成器。
+
+<a name="writing-a-periodic-watermarkgenerator"></a>
+
+### 自定义周期性 Watermark 生成器
+
+周期性生成器会观察流事件数据并定期生成 watermark(其生成可能取决于流数据,或者完全基于处理时间)。
+
+生成 watermark 的时间间隔(每 *n* 毫秒)可以通过 `ExecutionConfig.setAutoWatermarkInterval(...)` 指定。每次都会调用生成器的 `onPeriodicEmit()` 方法,如果返回的 watermark 非空且值大于前一个 watermark,则将发出新的 watermark。
+
+如下是两个使用周期性 watermark 生成器的简单示例。注意:Flink 已经附带了 `BoundedOutOfOrdernessWatermarks`,它实现了 `WatermarkGenerator`,其工作原理与下面的 `BoundedOutOfOrdernessGenerator` 相似。可以在[这里]({% link dev/event_timestamp_extractors.zh.md %})参阅如何使用它的内容。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 /**
- * This generator generates watermarks assuming that elements arrive out of order,
- * but only to a certain degree. The latest elements for a certain timestamp t will arrive
- * at most n milliseconds after the earliest elements for timestamp t.
+ * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
+ * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
  */
-public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
+public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
 
-    private final long maxOutOfOrderness = 3500; // 3.5 seconds
+    private final long maxOutOfOrderness = 3500; // 3.5 秒
 
     private long currentMaxTimestamp;
 
     @Override
-    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-        long timestamp = element.getCreationTime();
-        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
-        return timestamp;
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
+        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
     }
 
     @Override
-    public Watermark getCurrentWatermark() {
-        // return the watermark as current highest timestamp minus the out-of-orderness bound
-        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
+    public void onPeriodicEmit(WatermarkOutput output) {
+        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
+        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
     }
+
 }
 
 /**
- * This generator generates watermarks that are lagging behind processing time by a fixed amount.
- * It assumes that elements arrive in Flink after a bounded delay.
+ * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
  */
-public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
+public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
 
-	private final long maxTimeLag = 5000; // 5 seconds
+    private final long maxTimeLag = 5000; // 5 秒
 
-	@Override
-	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-		return element.getCreationTime();
-	}
+    @Override
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
+        // 处理时间场景下不需要实现
+    }
 
-	@Override
-	public Watermark getCurrentWatermark() {
-		// return the watermark as current time minus the maximum time lag
-		return new Watermark(System.currentTimeMillis() - maxTimeLag);
-	}
+    @Override
+    public void onPeriodicEmit(WatermarkOutput output) {
+        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
+    }
 }
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 /**
- * This generator generates watermarks assuming that elements arrive out of order,
- * but only to a certain degree. The latest elements for a certain timestamp t will arrive
- * at most n milliseconds after the earliest elements for timestamp t.
+ * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
+ * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
  */
 class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
-    val maxOutOfOrderness = 3500L // 3.5 seconds
+    val maxOutOfOrderness = 3500L // 3.5 秒
 
     var currentMaxTimestamp: Long = _
 
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-        val timestamp = element.getCreationTime()
-        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
-        timestamp
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp)
     }
 
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current highest timestamp minus the out-of-orderness bound
-        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
+    override def onPeriodicEmit(): Unit = {
+        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
+        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
     }
 }
 
 /**
- * This generator generates watermarks that are lagging behind processing time by a fixed amount.
- * It assumes that elements arrive in Flink after a bounded delay.
+ * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
  */
 class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
-    val maxTimeLag = 5000L // 5 seconds
+    val maxTimeLag = 5000L // 5 秒
 
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-        element.getCreationTime
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        // 处理时间场景下不需要实现
     }
 
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag
-        new Watermark(System.currentTimeMillis() - maxTimeLag)
+    override def onPeriodicEmit(): Unit = {
+        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-#### **With Punctuated Watermarks**
+<a name="writing-a-punctuated-watermarkgenerator"></a>
+
+### 自定义标记 Watermark 生成器
 
-To generate watermarks whenever a certain event indicates that a new watermark might be generated, use
-`AssignerWithPunctuatedWatermarks`. For this class Flink will first call the `extractTimestamp(...)` method
-to assign the element a timestamp, and then immediately call the
-`checkAndGetNextWatermark(...)` method on that element.
+标记 watermark 生成器观察流事件数据并在获取到带有 watermark 信息的特殊事件元素时发出 watermark。
 
-The `checkAndGetNextWatermark(...)` method is passed the timestamp that was assigned in the `extractTimestamp(...)`
-method, and can decide whether it wants to generate a watermark. Whenever the `checkAndGetNextWatermark(...)`
-method returns a non-null watermark, and that watermark is larger than the latest previous watermark, that
-new watermark will be emitted.
+如下是实现标记生成器的方法,当事件带有某个指定标记时,该生成器就会发出 watermark:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {
+public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
 
-	@Override
-	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-		return element.getCreationTime();
-	}
+    @Override
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
+        if (event.hasWatermarkMarker()) {
+            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
+        }
+    }
 
-	@Override
-	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
-		return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
-	}
+    @Override
+    public void onPeriodicEmit(WatermarkOutput output) {
+        // onEvent 中已经实现
+    }
 }
 {% endhighlight %}
 </div>
@@ -316,59 +331,53 @@ public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEv
 {% highlight scala %}
 class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
 
-	override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-		element.getCreationTime
-	}
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        if (event.hasWatermarkMarker()) {
+            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()))
+        }
+    }
 
-	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
-		if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
-	}
+    override def onPeriodicEmit(): Unit = {
+        // onEvent 中已经实现
+    }
 }
 {% endhighlight %}
 </div>
 </div>
 
-*Note:* It is possible to generate a watermark on every single event. However, because each watermark causes some
-computation downstream, an excessive number of watermarks degrades performance.
-
+<div class="alert alert-warning">
+<strong>注意</strong>:可以针对每个事件去生成 watermark。但是由于每个 watermark 都会在下游做一些计算,因此过多的 watermark 会降低程序性能。
+</div>
 
-## Timestamps per Kafka Partition
+<a name="watermark-strategies-and-the-kafka-connector"></a>
 
-When using [Apache Kafka](connectors/kafka.html) as a data source, each Kafka partition may have a simple event time pattern (ascending
-timestamps or bounded out-of-orderness). However, when consuming streams from Kafka, multiple partitions often get consumed in parallel,
-interleaving the events from the partitions and destroying the per-partition patterns (this is inherent in how Kafka's consumer clients work).
+## Watermark 策略与 Kafka 连接器
 
-In that case, you can use Flink's Kafka-partition-aware watermark generation. Using that feature, watermarks are generated inside the
-Kafka consumer, per Kafka partition, and the per-partition watermarks are merged in the same way as watermarks are merged on stream shuffles.
+当使用 [Apache Kafka 连接器](connectors/kafka.html)作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)。然而,当使用 Kafka 数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式(这是 Kafka 消费客户端所固有的)。
 
-For example, if event timestamps are strictly ascending per Kafka partition, generating per-partition watermarks with the
-[ascending timestamps watermark generator](event_timestamp_extractors.html#assigners-with-ascending-timestamps) will result in perfect overall watermarks.
+在这种情况下,你可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。
 
-The illustrations below show how to use the per-Kafka-partition watermark generation, and how watermarks propagate through the
-streaming dataflow in that case.
+例如,如果每个 Kafka 分区中的事件时间戳严格递增,则使用[时间戳单调递增](event_timestamp_extractors.html#时间戳单调递增)按分区生成的 watermark 将生成完美的全局 watermark。注意,我们在示例中未使用 `TimestampAssigner`,而是使用了 Kafka 记录自身的时间戳。
 
+下图展示了如何使用单 kafka 分区 watermark 生成机制,以及在这种情况下 watermark 如何通过 dataflow 传播。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-FlinkKafkaConsumer010<MyType> kafkaSource = new FlinkKafkaConsumer010<>("myTopic", schema, props);
-kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
-
-    @Override
-    public long extractAscendingTimestamp(MyType element) {
-        return element.eventTimestamp();
-    }
-});
+FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
+kafkaSource.assignTimestampsAndWatermarks(
+        WatermarkStrategy.
+                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
 
 DataStream<MyType> stream = env.addSource(kafkaSource);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val kafkaSource = new FlinkKafkaConsumer010[MyType]("myTopic", schema, props)
-kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
-    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
-})
+val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
+kafkaSource.assignTimestampsAndWatermarks(
+  WatermarkStrategy
+    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
 
 val stream: DataStream[MyType] = env.addSource(kafkaSource)
 {% endhighlight %}
@@ -377,4 +386,18 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" />
 
+<a name="how-operators-process-watermarks"></a>
+
+## 算子处理 Watermark 的方式
+
+一般情况下,在将 watermark 转发到下游之前,需要算子对其进行触发的事件完全进行处理。例如,`WindowOperator` 将首先计算该 watermark 触发的所有窗口数据,当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后,其才会被发送到下游。换句话说,由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出。
+
+相同的规则也适用于 `TwoInputStreamOperator`。但是,在这种情况下,算子当前的 watermark 会取其两个输入的最小值。
+
+详细内容可查看对应算子的实现:`OneInputStreamOperator#processWatermark`、`TwoInputStreamOperator#processWatermark1` 和 `TwoInputStreamOperator#processWatermark2`。
+
+## 可以弃用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了
+
+在 Flink 新的 `WatermarkStrategy`,`TimestampAssigner` 和 `WatermarkGenerator` 的抽象接口之前,Flink 使用的是 `AssignerWithPeriodicWatermarks` 和 `AssignerWithPeriodicWatermarks`。你仍可以在 API 中看到它们,但建议使用新接口,因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式。
+
 {% top %}