You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/14 05:40:21 UTC

[GitHub] [flink] klion26 commented on a change in pull request #12665: [FLINK-17886][docs-zh] Update Chinese documentation for new Watermark…

klion26 commented on a change in pull request #12665:
URL: https://github.com/apache/flink/pull/12665#discussion_r454109437



##########
File path: docs/dev/event_timestamp_extractors.zh.md
##########
@@ -25,83 +25,50 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the stream.
+如[生成 Watermark]({{ site.baseurl }}/zh/dev/event_timestamps_watermarks.html) 小节中所述,Flink 提供的抽象方法可以允许用户自己去定义时间戳分配方式和 watermark 发出的方式。具体就是你可以通过实现 `WatermarkGenerator` 接口来实现上述功能。
 
-In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.
-This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example
-for custom implementations.
+为了进一步简化此类任务的编程工作,Flink 框架预设了一些时间戳分配器。本节后续内容有举例。除了开箱即用的已有实现外,其还可以作为自定义实现的示例以供参考。
 
-### **Assigners with ascending timestamps**
+## 时间戳单调递增
 
-The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task
-occur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps will
-arrive.
+*周期性* watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。
 
-Note that it is only necessary that timestamps are ascending *per parallel data source task*. For example, if
-in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that
-timestamps are ascending within each Kafka partition. Flink's watermark merging mechanism will generate correct
-watermarks whenever parallel streams are shuffled, unioned, connected, or merged.
+注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个*单分区数据源任务*时间戳递增。例如,设置每一个并行数据源实例都只读取一个 Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可。Flink 的 watermark 合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks =
-    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
-
-        @Override
-        public long extractAscendingTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
+WatermarkStrategies
+        .<MyType>forMonotonousTimestamps()
+        .build();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
+WatermarkStrategies
+  .forMonotonousTimestamps[MyType]()
+  .build()
 {% endhighlight %}
 </div>
 </div>
 
-### **Assigners allowing a fixed amount of lateness**
+## 数据之间存在最大固定延迟
 
-Another example of periodic watermark generation is when the watermark lags behind the maximum (event-time) timestamp
-seen in the stream by a fixed amount of time. This case covers scenarios where the maximum lateness that can be encountered in a
-stream is known in advance, e.g. when creating a custom source containing elements with timestamps spread within a fixed period of
-time for testing. For these cases, Flink provides the `BoundedOutOfOrdernessTimestampExtractor` which takes as an argument
-the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed to be late before being ignored when computing the
-final result for the given window. Lateness corresponds to the result of `t - t_w`, where `t` is the (event-time) timestamp of an
-element, and `t_w` that of the previous watermark. If `lateness > 0` then the element is considered late and is, by default, ignored when computing
-the result of the job for its corresponding window. See the documentation about [allowed lateness]({{ site.baseurl }}/dev/stream/operators/windows.html#allowed-lateness)
-for more information about working with late elements.
+另一个周期性 watermark 生成的典型例子是,watermark 滞后于数据流中最大(事件时间)时间戳一个固定的时间量。该示例可以覆盖的场景是你预先知道数据流中的数据可能遇到的最大延迟,例如,在测试场景下创建了一个自定义数据源,并且这个数据源的产生的数据的时间戳在一个固定范围之内。Flink 针对上述场景提供了 `boundedOutfordernessWatermarks` 生成器,该生成器将 `maxOutOfOrderness` 作为参数,该参数代表在计算给定窗口的结果时,允许元素被忽略计算之前延迟到达的最长时间。其中延迟时长就等于 `t_w - t` ,其中 `t` 代表元素的(事件时间)时间戳,`t_w` 代表前一个 watermark 对应的(事件时间)时间戳。如果 `lateness > 0`,则认为该元素迟到了,并且在计算相应窗口的结果时默认会被忽略。有关使用延迟元素的详细内容,请参阅有关[允许延迟]({{ site.baseurl}}/zh/dev/stream/operators/windows.html#allowed-lateness)的文档。

Review comment:
       链接建议使用 `{%link %}` 的形式

##########
File path: docs/dev/event_time.zh.md
##########
@@ -67,21 +67,18 @@ stream
 {% highlight python %}
 env = StreamExecutionEnvironment.get_execution_environment()
 
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
 
-# alternatively:
-# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
-# env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
 {% endhighlight %}

Review comment:
       这一段为什么要改呢?

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -22,115 +22,77 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+在本节中,你将了解 Flink 中用于处理**事件时间**的时间戳和 watermark 相关的 API。有关*事件时间*,*处理时间*和*摄取时间*的介绍,请参阅[事件时间概览]({{ site.baseurl }}/zh/dev/event_time.html)小节。
+
 * toc
 {:toc}
 
+## Watermark 策略简介

Review comment:
       标题建议添加 `<a>` 标签

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -175,200 +137,243 @@ withTimestampsAndWatermarks
 </div>
 </div>
 
+使用 `WatermarkStrategy` 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。
 
-#### **With Periodic Watermarks**
+## 处理空闲数据源
 
-`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
-on the stream elements, or purely based on processing time).
+如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 `WatermarkGenerator` 也不会获得任何新数据去生成 watermark。我们称这类数据源为*空闲输入*或*空闲源*。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。
 
-The interval (every *n* milliseconds) in which the watermark will be generated is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner's `getCurrentWatermark()` method will be
-called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous
-watermark.
+为了解决这个问题,你可以使用 `WatermarkStrategy` 来检测空闲输入并将其标记为空闲状态。`WatermarkStrategies` 为此提供了一个工具接口:
 
-Here we show two simple examples of timestamp assigners that use periodic watermark generation. Note that Flink ships with a `BoundedOutOfOrdernessTimestampExtractor` similar to the `BoundedOutOfOrdernessGenerator` shown below, which you can read about [here]({{ site.baseurl }}/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness).
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withIdleness(Duration.ofMinutes(1))
+        .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withIdleness(Duration.ofMinutes(1))
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+
+## 自定义 WatermarkGenerator
+
+`TimestampAssigner` 是一个可以从事件数据中提取时间戳字段的简单函数,我们无需详细查看其实现。但是 `WatermarkGenerator` 的编写相对就要复杂一些了,我们将在接下来的两小节中介绍如何实现此接口。WatermarkGenerator 接口代码如下:
+
+{% highlight java %}
+/**
+ * The {@code WatermarkGenerator} generates watermarks either based on events or
+ * periodically (in a fixed interval).
+ *
+ * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
+ * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
+ */
+@Public
+public interface WatermarkGenerator<T> {
+
+    /**
+     * Called for every event, allows the watermark generator to examine and remember the
+     * event timestamps, or to emit a watermark based on the event itself.
+     */
+    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
+
+    /**
+     * Called periodically, and might emit a new watermark, or not.
+     *
+     * <p>The interval in which this method is called and Watermarks are generated
+     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
+     */
+    void onPeriodicEmit(WatermarkOutput output);
+}
+{% endhighlight %}
+
+watermark 的生成方式本质上是有两种:*周期性生成*和*标记生成*。
+
+周期性生成器通常通过 `onEvent()` 观察传入的事件数据,然后在框架调用 `onPeriodicEmit()` 时发出 watermark。
+
+标记生成器将查看 `onEvent()` 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 `onPeriodicEmit()` 发出 watermark。
+
+接下来,我们将学习如何实现上述两类生成器。
+
+### 自定义周期性 Watermark 生成器
+
+周期性生成器会观察流事件数据并定期生成 watermark(其生成可能取决于流数据,或者完全基于处理时间)。
+
+生成 watermark 的时间间隔(每 *n* 毫秒)可以通过 `ExecutionConfig.setAutoWatermarkInterval(...)` 指定。每次都会调用生成器的 `onPeriodicEmit()` 方法,如果返回的 watermark 非空且值大于前一个 watermark,则将发出新的 watermark。
+
+如下是两个使用周期性 watermark 生成器的简单示例。注意:Flink 已经附带了 `BoundedOutOfOrdernessWatermarks`,它实现了 `WatermarkGenerator`,其工作原理与下面的 `BoundedOutOfOrdernessGenerator` 相似。可以在[这里]({{ site.baseurl }}/zh/dev/event_timestamp_extractors.html#数据之间存在最大固定延迟)参阅如何使用它的内容。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 /**
- * This generator generates watermarks assuming that elements arrive out of order,
- * but only to a certain degree. The latest elements for a certain timestamp t will arrive
- * at most n milliseconds after the earliest elements for timestamp t.
+ * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
+ * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
  */
-public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
+public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
 
-    private final long maxOutOfOrderness = 3500; // 3.5 seconds
+    private final long maxOutOfOrderness = 3500; // 3.5 秒
 
     private long currentMaxTimestamp;
 
     @Override
-    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-        long timestamp = element.getCreationTime();
-        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
-        return timestamp;
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
+        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
     }
 
     @Override
-    public Watermark getCurrentWatermark() {
-        // return the watermark as current highest timestamp minus the out-of-orderness bound
-        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
+    public void onPeriodicEmit(WatermarkOutput output) {
+        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
+        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
     }
+
 }
 
 /**
- * This generator generates watermarks that are lagging behind processing time by a fixed amount.
- * It assumes that elements arrive in Flink after a bounded delay.
+ * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
  */
-public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
+public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
 
-	private final long maxTimeLag = 5000; // 5 seconds
+    private final long maxTimeLag = 5000; // 5 秒
 
-	@Override
-	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-		return element.getCreationTime();
-	}
+    @Override
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
+        // 处理时间场景下不需要实现
+    }
 
-	@Override
-	public Watermark getCurrentWatermark() {
-		// return the watermark as current time minus the maximum time lag
-		return new Watermark(System.currentTimeMillis() - maxTimeLag);
-	}
+    @Override
+    public void onPeriodicEmit(WatermarkOutput output) {
+        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
+    }
 }
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 /**
- * This generator generates watermarks assuming that elements arrive out of order,
- * but only to a certain degree. The latest elements for a certain timestamp t will arrive
- * at most n milliseconds after the earliest elements for timestamp t.
+ * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
+ * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
  */
 class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
-    val maxOutOfOrderness = 3500L // 3.5 seconds
+    val maxOutOfOrderness = 3500L // 3.5 秒
 
     var currentMaxTimestamp: Long = _
 
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-        val timestamp = element.getCreationTime()
-        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
-        timestamp
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp)
     }
 
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current highest timestamp minus the out-of-orderness bound
-        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
+    override def onPeriodicEmit(): Unit = {
+        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
+        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
     }
 }
 
 /**
- * This generator generates watermarks that are lagging behind processing time by a fixed amount.
- * It assumes that elements arrive in Flink after a bounded delay.
+ * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
  */
 class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
-    val maxTimeLag = 5000L // 5 seconds
+    val maxTimeLag = 5000L // 5 秒
 
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-        element.getCreationTime
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        // 处理时间场景下不需要实现
     }
 
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag
-        new Watermark(System.currentTimeMillis() - maxTimeLag)
+    override def onPeriodicEmit(): Unit = {
+        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-#### **With Punctuated Watermarks**
+### 自定义标记 Watermark 生成器
 
-To generate watermarks whenever a certain event indicates that a new watermark might be generated, use
-`AssignerWithPunctuatedWatermarks`. For this class Flink will first call the `extractTimestamp(...)` method
-to assign the element a timestamp, and then immediately call the
-`checkAndGetNextWatermark(...)` method on that element.
+标记 watermark 生成器观察流事件数据并在获取到带有 watermark 信息的特殊事件元素时发出 watermark。
 
-The `checkAndGetNextWatermark(...)` method is passed the timestamp that was assigned in the `extractTimestamp(...)`
-method, and can decide whether it wants to generate a watermark. Whenever the `checkAndGetNextWatermark(...)`
-method returns a non-null watermark, and that watermark is larger than the latest previous watermark, that
-new watermark will be emitted.
+如下是实现标记生成器的方法,当事件带有某个指定标记时,该生成器就会发出 watermark:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {
+public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
 
-	@Override
-	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-		return element.getCreationTime();
-	}
+    @Override
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
+        if (event.hasWatermarkMarker()) {
+            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
+        }
+    }
 
-	@Override
-	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
-		return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
-	}
+    @Override
+    public void onPeriodicEmit(WatermarkOutput output) {
+        // onEvent 中已经实现
+    }
 }
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
 
-	override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-		element.getCreationTime
-	}
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        if (event.hasWatermarkMarker()) {
+            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()))
+        }
+    }
 
-	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
-		if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
-	}
+    override def onPeriodicEmit(): Unit = {
+        // onEvent 中已经实现
+    }
 }
 {% endhighlight %}
 </div>
 </div>
 
-*Note:* It is possible to generate a watermark on every single event. However, because each watermark causes some
-computation downstream, an excessive number of watermarks degrades performance.
-
-
-## Timestamps per Kafka Partition
+<div class="alert alert-warning">
+<strong>注意</strong>:可以针对每个事件去生成 watermark。但是由于每个 watermark 都会在下游做一些计算,因此过多的 watermark 会降低程序性能。
+</div>
 
-When using [Apache Kafka](connectors/kafka.html) as a data source, each Kafka partition may have a simple event time pattern (ascending
-timestamps or bounded out-of-orderness). However, when consuming streams from Kafka, multiple partitions often get consumed in parallel,
-interleaving the events from the partitions and destroying the per-partition patterns (this is inherent in how Kafka's consumer clients work).
+## Watermark 策略与 Kafka 连接器

Review comment:
       标题增加 `<a>` 标签

##########
File path: docs/dev/event_timestamp_extractors.zh.md
##########
@@ -25,83 +25,50 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the stream.
+如[生成 Watermark]({{ site.baseurl }}/zh/dev/event_timestamps_watermarks.html) 小节中所述,Flink 提供的抽象方法可以允许用户自己去定义时间戳分配方式和 watermark 发出的方式。具体就是你可以通过实现 `WatermarkGenerator` 接口来实现上述功能。

Review comment:
       这里建议使用 `{%link dev/event_timestamps_watermarks.zh.md %}` 的形式写链接,
   具体可以参考 [邮件列表](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Reminder-Prefer-link-tag-in-documentation-td42362.html)
   
   `watermark 发出` -> `watermark 生成` 是否更好一些,标题也是生成,这样可以全文进行统一
   
   `具体就是你可以通过实现` -> `你可以通过实现` 这样会好一些吗?

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -22,115 +22,77 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+在本节中,你将了解 Flink 中用于处理**事件时间**的时间戳和 watermark 相关的 API。有关*事件时间*,*处理时间*和*摄取时间*的介绍,请参阅[事件时间概览]({{ site.baseurl }}/zh/dev/event_time.html)小节。

Review comment:
       链接建议使用 `{%link %}` 的形式

##########
File path: docs/dev/event_time.zh.md
##########
@@ -24,13 +24,13 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-本节你将学到如何写可感知时间变化的 Flink 程序,可以先看看[实时流处理]({% link concepts/timely-stream-processing.zh.md %})了解相关概念。
+在本节中,你将学习编写可对时间进行感知(time-aware)的 Flink 程序。可以参阅[实时流处理]({% link concepts/timely-stream-processing.zh.md %})小节以了解实时流处理的概念。

Review comment:
       `可对时间进行感知` 这里使用原来的 `可感知时间变化` 的会更好吗?现在的意思也说的通,读起来有点拗口

##########
File path: docs/dev/event_timestamp_extractors.zh.md
##########
@@ -25,83 +25,50 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the stream.
+如[生成 Watermark]({{ site.baseurl }}/zh/dev/event_timestamps_watermarks.html) 小节中所述,Flink 提供的抽象方法可以允许用户自己去定义时间戳分配方式和 watermark 发出的方式。具体就是你可以通过实现 `WatermarkGenerator` 接口来实现上述功能。
 
-In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.
-This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example
-for custom implementations.
+为了进一步简化此类任务的编程工作,Flink 框架预设了一些时间戳分配器。本节后续内容有举例。除了开箱即用的已有实现外,其还可以作为自定义实现的示例以供参考。
 
-### **Assigners with ascending timestamps**
+## 时间戳单调递增

Review comment:
       标题这里:
   1 建议增加 `<a>` 标签,这样其他地方引用这个标题的话,就可以使用原来的链接了,具体参考 [wiki](https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications)
   2. 这里建议把 `assigner` 的意思也翻译出来

##########
File path: docs/dev/event_timestamp_extractors.zh.md
##########
@@ -25,83 +25,50 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the stream.
+如[生成 Watermark]({{ site.baseurl }}/zh/dev/event_timestamps_watermarks.html) 小节中所述,Flink 提供的抽象方法可以允许用户自己去定义时间戳分配方式和 watermark 发出的方式。具体就是你可以通过实现 `WatermarkGenerator` 接口来实现上述功能。
 
-In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.
-This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example
-for custom implementations.
+为了进一步简化此类任务的编程工作,Flink 框架预设了一些时间戳分配器。本节后续内容有举例。除了开箱即用的已有实现外,其还可以作为自定义实现的示例以供参考。
 
-### **Assigners with ascending timestamps**
+## 时间戳单调递增
 
-The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task
-occur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps will
-arrive.
+*周期性* watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。
 
-Note that it is only necessary that timestamps are ascending *per parallel data source task*. For example, if
-in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that
-timestamps are ascending within each Kafka partition. Flink's watermark merging mechanism will generate correct
-watermarks whenever parallel streams are shuffled, unioned, connected, or merged.
+注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个*单分区数据源任务*时间戳递增。例如,设置每一个并行数据源实例都只读取一个 Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可。Flink 的 watermark 合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks =
-    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
-
-        @Override
-        public long extractAscendingTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
+WatermarkStrategies
+        .<MyType>forMonotonousTimestamps()
+        .build();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
+WatermarkStrategies
+  .forMonotonousTimestamps[MyType]()
+  .build()
 {% endhighlight %}
 </div>
 </div>
 
-### **Assigners allowing a fixed amount of lateness**
+## 数据之间存在最大固定延迟

Review comment:
       这个标题建议
   1 增加 `<a>` 标签
   2 增加 `assigner` 的翻译

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -22,115 +22,77 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+在本节中,你将了解 Flink 中用于处理**事件时间**的时间戳和 watermark 相关的 API。有关*事件时间*,*处理时间*和*摄取时间*的介绍,请参阅[事件时间概览]({{ site.baseurl }}/zh/dev/event_time.html)小节。
+
 * toc
 {:toc}
 
+## Watermark 策略简介
 
-This section is relevant for programs running on **event time**. For an introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' *timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by accessing/extracting the
-timestamp from some field in the element.
+为了使用*事件时间*语义,Flink 应用程序需要知道事件*时间戳*对应的字段,意味着数据流中的每个元素都需要拥有*可分配*的事件时间戳。其通常通过使用 `TimestampAssigner` API 从元素中的某个字段去访问/提取时间戳。
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
-progress in event time.
+时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 `WatermarkGenerator` 来配置 watermark 的生成方式。
 
-There are two ways to assign timestamps and generate watermarks:
+使用 Flink API 时需要设置一个同时包含 `TimestampAssigner` 和 `WatermarkGenerator` 的 `WatermarkStrategy`。`WatermarkStrategies` 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:
 
-  1. Directly in the data stream source
-  2. Via a timestamp assigner / watermark generator: in Flink, timestamp assigners also define the watermarks to be emitted
-
-<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
-milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
-
-### Source Functions with Timestamps and Watermarks
+{% highlight java %}
+public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
 
-Stream sources can directly assign timestamps to the elements they produce, and they can also emit watermarks.
-When this is done, no timestamp assigner is needed.
-Note that if a timestamp assigner is used, any timestamps and watermarks provided by the source will be overwritten.
+    /**
+     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
+     * strategy.
+     */
+    @Override
+    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
 
-To assign a timestamp to an element in the source directly, the source must use the `collectWithTimestamp(...)`
-method on the `SourceContext`. To generate watermarks, the source must call the `emitWatermark(Watermark)` function.
+    /**
+     * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
+     */
+    @Override
+    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+}
+{% endhighlight %}
 
-Below is a simple example of a *(non-checkpointed)* source that assigns timestamps and generates watermarks:
+如上所述,通常情况下,你不用实现此接口,而是可以使用 `WatermarkStrategies` 工具类中通用的 watermark 策略,或者可以使用这个工具类将自定义的 `TimestampAssigner` 与 `WatermarkGenerator` 进行绑定。例如,你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-@Override
-public void run(SourceContext<MyType> ctx) throws Exception {
-	while (/* condition */) {
-		MyType next = getNext();
-		ctx.collectWithTimestamp(next, next.getEventTimestamp());
-
-		if (next.hasWatermarkTime()) {
-			ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
-		}
-	}
-}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withTimestampAssigner((event, timestamp) -> event.f0)
+        .build();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-override def run(ctx: SourceContext[MyType]): Unit = {
-	while (/* condition */) {
-		val next: MyType = getNext()
-		ctx.collectWithTimestamp(next, next.eventTimestamp)
-
-		if (next.hasWatermarkTime) {
-			ctx.emitWatermark(new Watermark(next.getWatermarkTime))
-		}
-	}
-}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
+    override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
+  })
+  .build()
 {% endhighlight %}
+
+(Using Scala Lambdas here currently doesn't work because Scala is stupid and it's hard to support this. #fus)
 </div>
 </div>
 
+其中 `TimestampAssigner` 的设置与否是可选的,大多数情况下,可以不用去特别指定。例如,当使用 Kafka 或 Kinesis 数据源时,你可以直接从 Kafka/Kinesis 数据源记录中获取到时间戳。
 
-### Timestamp Assigners / Watermark Generators
+稍后我们将在[自定义 WatermarkGenerator](#自定义-watermarkgenerator) 小节学习 WatermarkGenerator 接口。
 
-Timestamp assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
-original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
+<div class="alert alert-warning">
+<strong>注意</strong>:时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
+</div>
 
-Timestamp assigners are usually specified immediately after the data source, but it is not strictly required to do so.
-A common pattern, for example, is to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
-In any case, the timestamp assigner needs to be specified before the first operation on event time
-(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
-Flink allows the specification of a timestamp assigner / watermark emitter inside
-the source (or consumer) itself. More information on how to do so can be found in the
-[Kafka Connector documentation]({{ site.baseurl }}/dev/connectors/kafka.html).
+## 使用 Watermark 策略

Review comment:
       添加 <a> 

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -175,200 +137,243 @@ withTimestampsAndWatermarks
 </div>
 </div>
 
+使用 `WatermarkStrategy` 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。
 
-#### **With Periodic Watermarks**
+## 处理空闲数据源
 
-`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
-on the stream elements, or purely based on processing time).
+如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 `WatermarkGenerator` 也不会获得任何新数据去生成 watermark。我们称这类数据源为*空闲输入*或*空闲源*。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。
 
-The interval (every *n* milliseconds) in which the watermark will be generated is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner's `getCurrentWatermark()` method will be
-called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous
-watermark.
+为了解决这个问题,你可以使用 `WatermarkStrategy` 来检测空闲输入并将其标记为空闲状态。`WatermarkStrategies` 为此提供了一个工具接口:
 
-Here we show two simple examples of timestamp assigners that use periodic watermark generation. Note that Flink ships with a `BoundedOutOfOrdernessTimestampExtractor` similar to the `BoundedOutOfOrdernessGenerator` shown below, which you can read about [here]({{ site.baseurl }}/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness).
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withIdleness(Duration.ofMinutes(1))
+        .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withIdleness(Duration.ofMinutes(1))
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+
+## 自定义 WatermarkGenerator
+
+`TimestampAssigner` 是一个可以从事件数据中提取时间戳字段的简单函数,我们无需详细查看其实现。但是 `WatermarkGenerator` 的编写相对就要复杂一些了,我们将在接下来的两小节中介绍如何实现此接口。WatermarkGenerator 接口代码如下:
+
+{% highlight java %}
+/**
+ * The {@code WatermarkGenerator} generates watermarks either based on events or
+ * periodically (in a fixed interval).
+ *
+ * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
+ * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
+ */
+@Public
+public interface WatermarkGenerator<T> {
+
+    /**
+     * Called for every event, allows the watermark generator to examine and remember the
+     * event timestamps, or to emit a watermark based on the event itself.
+     */
+    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
+
+    /**
+     * Called periodically, and might emit a new watermark, or not.
+     *
+     * <p>The interval in which this method is called and Watermarks are generated
+     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
+     */
+    void onPeriodicEmit(WatermarkOutput output);
+}
+{% endhighlight %}
+
+watermark 的生成方式本质上是有两种:*周期性生成*和*标记生成*。
+
+周期性生成器通常通过 `onEvent()` 观察传入的事件数据,然后在框架调用 `onPeriodicEmit()` 时发出 watermark。
+
+标记生成器将查看 `onEvent()` 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 `onPeriodicEmit()` 发出 watermark。
+
+接下来,我们将学习如何实现上述两类生成器。
+
+### 自定义周期性 Watermark 生成器
+
+周期性生成器会观察流事件数据并定期生成 watermark(其生成可能取决于流数据,或者完全基于处理时间)。
+
+生成 watermark 的时间间隔(每 *n* 毫秒)可以通过 `ExecutionConfig.setAutoWatermarkInterval(...)` 指定。每次都会调用生成器的 `onPeriodicEmit()` 方法,如果返回的 watermark 非空且值大于前一个 watermark,则将发出新的 watermark。
+
+如下是两个使用周期性 watermark 生成器的简单示例。注意:Flink 已经附带了 `BoundedOutOfOrdernessWatermarks`,它实现了 `WatermarkGenerator`,其工作原理与下面的 `BoundedOutOfOrdernessGenerator` 相似。可以在[这里]({{ site.baseurl }}/zh/dev/event_timestamp_extractors.html#数据之间存在最大固定延迟)参阅如何使用它的内容。

Review comment:
       链接使用 `{%link %}` 的方式

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -22,115 +22,77 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+在本节中,你将了解 Flink 中用于处理**事件时间**的时间戳和 watermark 相关的 API。有关*事件时间*,*处理时间*和*摄取时间*的介绍,请参阅[事件时间概览]({{ site.baseurl }}/zh/dev/event_time.html)小节。
+
 * toc
 {:toc}
 
+## Watermark 策略简介
 
-This section is relevant for programs running on **event time**. For an introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' *timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by accessing/extracting the
-timestamp from some field in the element.
+为了使用*事件时间*语义,Flink 应用程序需要知道事件*时间戳*对应的字段,意味着数据流中的每个元素都需要拥有*可分配*的事件时间戳。其通常通过使用 `TimestampAssigner` API 从元素中的某个字段去访问/提取时间戳。
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
-progress in event time.
+时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 `WatermarkGenerator` 来配置 watermark 的生成方式。
 
-There are two ways to assign timestamps and generate watermarks:
+使用 Flink API 时需要设置一个同时包含 `TimestampAssigner` 和 `WatermarkGenerator` 的 `WatermarkStrategy`。`WatermarkStrategies` 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:
 
-  1. Directly in the data stream source
-  2. Via a timestamp assigner / watermark generator: in Flink, timestamp assigners also define the watermarks to be emitted
-
-<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
-milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
-
-### Source Functions with Timestamps and Watermarks
+{% highlight java %}
+public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
 
-Stream sources can directly assign timestamps to the elements they produce, and they can also emit watermarks.
-When this is done, no timestamp assigner is needed.
-Note that if a timestamp assigner is used, any timestamps and watermarks provided by the source will be overwritten.
+    /**
+     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this

Review comment:
       这些注释方便翻译一下吗?

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -22,115 +22,77 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+在本节中,你将了解 Flink 中用于处理**事件时间**的时间戳和 watermark 相关的 API。有关*事件时间*,*处理时间*和*摄取时间*的介绍,请参阅[事件时间概览]({{ site.baseurl }}/zh/dev/event_time.html)小节。
+
 * toc
 {:toc}
 
+## Watermark 策略简介
 
-This section is relevant for programs running on **event time**. For an introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' *timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by accessing/extracting the
-timestamp from some field in the element.
+为了使用*事件时间*语义,Flink 应用程序需要知道事件*时间戳*对应的字段,意味着数据流中的每个元素都需要拥有*可分配*的事件时间戳。其通常通过使用 `TimestampAssigner` API 从元素中的某个字段去访问/提取时间戳。
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
-progress in event time.
+时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 `WatermarkGenerator` 来配置 watermark 的生成方式。
 
-There are two ways to assign timestamps and generate watermarks:
+使用 Flink API 时需要设置一个同时包含 `TimestampAssigner` 和 `WatermarkGenerator` 的 `WatermarkStrategy`。`WatermarkStrategies` 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:
 
-  1. Directly in the data stream source
-  2. Via a timestamp assigner / watermark generator: in Flink, timestamp assigners also define the watermarks to be emitted
-
-<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
-milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
-
-### Source Functions with Timestamps and Watermarks
+{% highlight java %}
+public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
 
-Stream sources can directly assign timestamps to the elements they produce, and they can also emit watermarks.
-When this is done, no timestamp assigner is needed.
-Note that if a timestamp assigner is used, any timestamps and watermarks provided by the source will be overwritten.
+    /**
+     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
+     * strategy.
+     */
+    @Override
+    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
 
-To assign a timestamp to an element in the source directly, the source must use the `collectWithTimestamp(...)`
-method on the `SourceContext`. To generate watermarks, the source must call the `emitWatermark(Watermark)` function.
+    /**
+     * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
+     */
+    @Override
+    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+}
+{% endhighlight %}
 
-Below is a simple example of a *(non-checkpointed)* source that assigns timestamps and generates watermarks:
+如上所述,通常情况下,你不用实现此接口,而是可以使用 `WatermarkStrategies` 工具类中通用的 watermark 策略,或者可以使用这个工具类将自定义的 `TimestampAssigner` 与 `WatermarkGenerator` 进行绑定。例如,你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-@Override
-public void run(SourceContext<MyType> ctx) throws Exception {
-	while (/* condition */) {
-		MyType next = getNext();
-		ctx.collectWithTimestamp(next, next.getEventTimestamp());
-
-		if (next.hasWatermarkTime()) {
-			ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
-		}
-	}
-}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withTimestampAssigner((event, timestamp) -> event.f0)
+        .build();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-override def run(ctx: SourceContext[MyType]): Unit = {
-	while (/* condition */) {
-		val next: MyType = getNext()
-		ctx.collectWithTimestamp(next, next.eventTimestamp)
-
-		if (next.hasWatermarkTime) {
-			ctx.emitWatermark(new Watermark(next.getWatermarkTime))
-		}
-	}
-}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
+    override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
+  })
+  .build()
 {% endhighlight %}
+
+(Using Scala Lambdas here currently doesn't work because Scala is stupid and it's hard to support this. #fus)
 </div>
 </div>
 
+其中 `TimestampAssigner` 的设置与否是可选的,大多数情况下,可以不用去特别指定。例如,当使用 Kafka 或 Kinesis 数据源时,你可以直接从 Kafka/Kinesis 数据源记录中获取到时间戳。
 
-### Timestamp Assigners / Watermark Generators
+稍后我们将在[自定义 WatermarkGenerator](#自定义-watermarkgenerator) 小节学习 WatermarkGenerator 接口。

Review comment:
       这里建议在标题加上 `<a>` 标签,然后这里就可以使用英文的链接了,其他引用的人也可以保持不变

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -175,200 +137,243 @@ withTimestampsAndWatermarks
 </div>
 </div>
 
+使用 `WatermarkStrategy` 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。
 
-#### **With Periodic Watermarks**
+## 处理空闲数据源
 
-`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
-on the stream elements, or purely based on processing time).
+如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 `WatermarkGenerator` 也不会获得任何新数据去生成 watermark。我们称这类数据源为*空闲输入*或*空闲源*。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。
 
-The interval (every *n* milliseconds) in which the watermark will be generated is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner's `getCurrentWatermark()` method will be
-called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous
-watermark.
+为了解决这个问题,你可以使用 `WatermarkStrategy` 来检测空闲输入并将其标记为空闲状态。`WatermarkStrategies` 为此提供了一个工具接口:
 
-Here we show two simple examples of timestamp assigners that use periodic watermark generation. Note that Flink ships with a `BoundedOutOfOrdernessTimestampExtractor` similar to the `BoundedOutOfOrdernessGenerator` shown below, which you can read about [here]({{ site.baseurl }}/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness).
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withIdleness(Duration.ofMinutes(1))
+        .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withIdleness(Duration.ofMinutes(1))
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+
+## 自定义 WatermarkGenerator
+
+`TimestampAssigner` 是一个可以从事件数据中提取时间戳字段的简单函数,我们无需详细查看其实现。但是 `WatermarkGenerator` 的编写相对就要复杂一些了,我们将在接下来的两小节中介绍如何实现此接口。WatermarkGenerator 接口代码如下:
+
+{% highlight java %}
+/**
+ * The {@code WatermarkGenerator} generates watermarks either based on events or

Review comment:
       建议注意也翻译下

##########
File path: docs/dev/event_timestamps_watermarks.zh.md
##########
@@ -175,200 +137,243 @@ withTimestampsAndWatermarks
 </div>
 </div>
 
+使用 `WatermarkStrategy` 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。
 
-#### **With Periodic Watermarks**
+## 处理空闲数据源

Review comment:
       增加 `<a>` 标签




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org