You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/17 13:08:30 UTC

[flink] branch master updated: [FLINK-26164] Document watermark alignment

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 106280e  [FLINK-26164] Document watermark alignment
106280e is described below

commit 106280e10a96d729943985986198b942446197d9
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 15 16:36:33 2022 +0100

    [FLINK-26164] Document watermark alignment
    
    This closes #18783
---
 .../datastream/event-time/generating_watermarks.md | 70 ++++++++++++++++++++++
 .../datastream/event-time/generating_watermarks.md | 70 ++++++++++++++++++++++
 2 files changed, 140 insertions(+)

diff --git a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
index bed1bc5..46294f1 100644
--- a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
@@ -165,6 +165,76 @@ WatermarkStrategy
 {{< /tab >}}
 {{< /tabs >}}
 
+## Watermark alignment _`Beta`_
+
+In the previous paragraph we discussed a situation when splits/partitions/shards or sources are idle
+and can stall increasing watermarks. On the other side of the spectrum, a split/partition/shard or
+source may process records very fast and in turn increase its watermark relatively faster than the
+others. This on its own is not a problem per se. However, for downstream operators that are using
+watermarks to emit some data it can actually become a problem.
+
+In this case, contrary to idle sources, the watermark of such downstream operator (like windowed
+joins on aggregations) can progress. However, such operator might need to buffer excessive amount of
+data coming from the fast inputs, as the minimal watermark from all of its inputs is held back by
+the lagging input. All records emitted by the fast input will hence have to be buffered
+in the said downstream operator state, which can lead into uncontrollable growth of the operator's
+state.
+
+In order to address the issue, you can enable watermark alignment, which will make sure no
+sources/splits/shards/partitions increase their watermarks too far ahead of the rest. You can enable
+alignment for every source separately:
+
+
+{{< tabs >}}
+{{< tab "Java" >}}
+```java
+WatermarkStrategy
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+WatermarkStrategy
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< hint warning >}}
+**Note:** You can enable watermark alignment only for [FLIP-27]({{< ref "docs/dev/datastream/sources" >}})
+sources. It does not work for legacy or if applied after the source via
+[DataStream#assignTimestampsAndWatermarks](#using-watermark-strategies).
+{{< /hint >}}
+
+When enabling the alignment, you need to tell Flink, which group should the source belong. You do
+that by providing a label (e.g. `alignment-group-1`) which bind together all sources that share it.
+Moreover, you have to tell the maximal drift from the current minimal watermarks across all sources
+belonging to that group. The third parameter describes how often the current maximal watermark
+should be updated. The downside of frequent updates is that there will be more RPC messages
+travelling between TMs and the JM.
+
+In order to achieve the alignment Flink will pause consuming from the source/task, which generated
+watermark that is too far into the future. In the meantime it will continue reading records from
+other sources/tasks which can move the combined watermark forward and that way unblock the faster
+one.
+
+{{< hint warning >}}
+**Note:** As of 1.15, Flink supports aligning across tasks of the same source and/or different
+sources. It does not support aligning splits/partitions/shards in the same task.
+
+In a case where there are e.g. two Kafka partitions that produce watermarks at different pace, that
+get assigned to the same task watermark might not behave as expected. Fortunately, worst case it
+should not perform worse than without alignment.
+
+Given the limitation above, we suggest applying watermark alignment in two situations:
+
+1. You have two different sources (e.g. Kafka and File) that produce watermarks at different speeds
+2. You run your source with parallelism equal to the number of splits/shards/partitions, which
+   results in every subtask being assigned a single unit of work.
+
+{{< /hint >}}
 
 <a name="writing-watermarkgenerators"></a>
 
diff --git a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
index 9d1bb83..fc20833 100644
--- a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
@@ -201,6 +201,76 @@ WatermarkStrategy
 {{< /tab >}}
 {{< /tabs >}}
 
+## Watermark alignment _`Beta`_
+
+In the previous paragraph we discussed a situation when splits/partitions/shards or sources are idle
+and can stall increasing watermarks. On the other side of the spectrum, a split/partition/shard or
+source may process records very fast and in turn increase its watermark relatively faster than the
+others. This on its own is not a problem per se. However, for downstream operators that are using
+watermarks to emit some data it can actually become a problem. 
+
+In this case, contrary to idle sources, the watermark of such downstream operator (like windowed
+joins on aggregations) can progress. However, such operator might need to buffer excessive amount of
+data coming from the fast inputs, as the minimal watermark from all of its inputs is held back by
+the lagging input. All records emitted by the fast input will hence have to be buffered
+in the said downstream operator state, which can lead into uncontrollable growth of the operator's
+state.
+
+In order to address the issue, you can enable watermark alignment, which will make sure no
+sources/splits/shards/partitions increase their watermarks too far ahead of the rest. You can enable
+alignment for every source separately:
+
+
+{{< tabs >}}
+{{< tab "Java" >}}
+```java
+WatermarkStrategy
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+WatermarkStrategy
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< hint warning >}}
+**Note:** You can enable watermark alignment only for [FLIP-27]({{< ref "docs/dev/datastream/sources" >}}) 
+sources. It does not work for legacy or if applied after the source via 
+[DataStream#assignTimestampsAndWatermarks](#using-watermark-strategies).
+{{< /hint >}}
+
+When enabling the alignment, you need to tell Flink, which group should the source belong. You do
+that by providing a label (e.g. `alignment-group-1`) which bind together all sources that share it.
+Moreover, you have to tell the maximal drift from the current minimal watermarks across all sources
+belonging to that group. The third parameter describes how often the current maximal watermark
+should be updated. The downside of frequent updates is that there will be more RPC messages
+travelling between TMs and the JM.
+
+In order to achieve the alignment Flink will pause consuming from the source/task, which generated
+watermark that is too far into the future. In the meantime it will continue reading records from
+other sources/tasks which can move the combined watermark forward and that way unblock the faster
+one.
+
+{{< hint warning >}}
+**Note:** As of 1.15, Flink supports aligning across tasks of the same source and/or different
+sources. It does not support aligning splits/partitions/shards in the same task.
+
+In a case where there are e.g. two Kafka partitions that produce watermarks at different pace, that
+get assigned to the same task watermark might not behave as expected. Fortunately, worst case it
+should not perform worse than without alignment.
+
+Given the limitation above, we suggest applying watermark alignment in two situations:
+
+1. You have two different sources (e.g. Kafka and File) that produce watermarks at different speeds
+2. You run your source with parallelism equal to the number of splits/shards/partitions, which
+   results in every subtask being assigned a single unit of work.
+
+{{< /hint >}}
 
 ## Writing WatermarkGenerators