You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/19 15:05:39 UTC

[GitHub] [flink] aljoscha opened a new pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

aljoscha opened a new pull request #12253:
URL: https://github.com/apache/flink/pull/12253


   ## What is the purpose of the change
   
   This brings the documentation up-to-date with the latest changes. This is not a simple find-replace job, I also restructured/updated the documentation and removed so sections that contain outdated advice and added new sections.
   
   It's probably easiest to not look at the diff but to build the docs locally and check out the event-time section.
   
   @sjwiesman @alpinegizmo Could you please review this?
   
   fyi @StephanEwen 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-632693683


   πŸ‘ŒI'm merging


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on a change in pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #12253:
URL: https://github.com/apache/flink/pull/12253#discussion_r429225536



##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}

Review comment:
       Yeah, let’s remove it. How many people are really building custom sources anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-630883196


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit c916fded396ee0d7cf9f03adf794c3929d3c3823 (Tue May 19 15:07:30 UTC 2020)
   
   **Warnings:**
    * Documentation files were touched, but no `.zh.md` files: Update Chinese documentation or file Jira ticket.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on a change in pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #12253:
URL: https://github.com/apache/flink/pull/12253#discussion_r427550788



##########
File path: docs/dev/event_time.md
##########
@@ -34,30 +34,22 @@ For information about how to use time in Flink programs refer to
 [ProcessFunction]({% link
 dev/stream/operators/process_function.md %}).
 
-* toc
-{:toc}
+A prerequisite for using *event time* processing is setting the right *time
+characteristic*. That setting defines how data stream sources behave (for
+example, whether they will assign timestamps), and what notion of time should
+be used by window operations like `KeyedStream.timeWindow(Time.seconds(30))`.
 
-## Setting a Time Characteristic
-
-The first part of a Flink DataStream program usually sets the base *time characteristic*. That setting
-defines how data stream sources behave (for example, whether they will assign timestamps), and what notion of
-time should be used by window operations like `KeyedStream.timeWindow(Time.seconds(30))`.
-
-The following example shows a Flink program that aggregates events in hourly time windows. The behavior of the
-windows adapts with the time characteristic.
+You can set the time characteristic using
+`StreamExecutionEnvironment.setStreamTimeCharacteristic()`:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer010<MyEvent>(topic, schema, props));

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12253:
URL: https://github.com/apache/flink/pull/12253#discussion_r429212797



##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}

Review comment:
       For this one, I just copied the existing example. I might update to the new Source interface as a follow-up, but that's very different. Maybe we shouldn't even mention this here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-630911127


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1876",
       "triggerID" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e97f18148cb95bca953ace18cb226cbbbab8f00d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2045",
       "triggerID" : "e97f18148cb95bca953ace18cb226cbbbab8f00d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c916fded396ee0d7cf9f03adf794c3929d3c3823 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1876) 
   * e97f18148cb95bca953ace18cb226cbbbab8f00d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2045) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-630911127


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1876",
       "triggerID" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c916fded396ee0d7cf9f03adf794c3929d3c3823 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1876) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on a change in pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #12253:
URL: https://github.com/apache/flink/pull/12253#discussion_r427558322



##########
File path: docs/dev/event_timestamp_extractors.md
##########
@@ -25,83 +25,80 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the stream.
+As described in [Generating Watermarks]({{ site.baseurl
+}}/dev/event_timestamps_watermarks.html), Flink provides abstractions that
+allow the programmer to assign their own timestamps and emit their own
+watermarks. More specifically, one can do so by implementing the
+`WatermarkGenerator` interface.
 
-In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.
-This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example
-for custom implementations.
+In order to further ease the programming effort for such tasks, Flink comes
+with some pre-implemented timestamp assigners.  This section provides a list of
+them. Apart from their out-of-the-box functionality, their implementation can
+serve as an example for custom implementations.
 
-### **Assigners with ascending timestamps**
+## Monotonously Increasing Timestamps
 
-The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task
-occur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps will
-arrive.
+The simplest special case for *periodic* watermark generation is the case where

Review comment:
       You use the word case three times in two sentences.
   ```suggestion
   The simplest special case for *periodic* watermark generation is the when
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -22,115 +22,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+In this section you will learn about the APIs that Flink provides for working
+with **event time** timestamps and watermarks.  For an introduction to *event
+time*, *processing time*, and *ingestion time*, please refer to the
+[introduction to event time]({{ site.baseurl }}/dev/event_time.html).
+
 * toc
 {:toc}
 
+## Introduction to Watermark Strategies
 
-This section is relevant for programs running on **event time**. For an introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' *timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by accessing/extracting the
-timestamp from some field in the element.
+In order to work with *event time*, Flink needs to know the events'

Review comment:
       ```suggestion
   In order to work with *event time*, Flink needs to know the events
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -22,115 +22,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+In this section you will learn about the APIs that Flink provides for working
+with **event time** timestamps and watermarks.  For an introduction to *event
+time*, *processing time*, and *ingestion time*, please refer to the
+[introduction to event time]({{ site.baseurl }}/dev/event_time.html).
+
 * toc
 {:toc}
 
+## Introduction to Watermark Strategies
 
-This section is relevant for programs running on **event time**. For an introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' *timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by accessing/extracting the
-timestamp from some field in the element.
+In order to work with *event time*, Flink needs to know the events'
+*timestamps*, meaning each element in the stream needs to have its event
+timestamp *assigned*. This is usually done by accessing/extracting the
+timestamp from some field in the element by using a `TimestampAssigner`.
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
-progress in event time.
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell
+the system about progress in event time. You can configure this by speciying a
+`WatermarkGenerator`.
 
-There are two ways to assign timestamps and generate watermarks:
+The Flink API expects both `TimestampAssigner` and `WatermarkGenerator` to be
+bundled in a `WatermarkStrategy`. As you will see below, you usually don't have
+to do this by hand because you will mostly use the convenience helper
+`WatermarkStrategies`. Here is the interface for completess' sake:

Review comment:
       I think it always makes sense to let users know there is an easy option first and then outline the low level interface. Its less scary πŸ‘» 
   ```suggestion
   The Flink API expects a `WatermarkStrategy` that contains both a `TimestampAssigner` and `WatermarkGenerator`.
   A number of common strategies out of the box, available in the `WatermarkStrategies` helper, but users can also build their own strategies when required. 
   Here is the interface for completeness' sake:
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def run(ctx: SourceContext[MyType]): Unit = {
+    while (/* condition */) {
+        val next: MyType = getNext()
+        ctx.collectWithTimestamp(next, next.eventTimestamp)
+
+        if (next.hasWatermarkTime) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
+        }
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## How Operators Are Processing Watermarks

Review comment:
       ```suggestion
   ## How Operators Process Watermarks
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -22,115 +22,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+In this section you will learn about the APIs that Flink provides for working
+with **event time** timestamps and watermarks.  For an introduction to *event
+time*, *processing time*, and *ingestion time*, please refer to the
+[introduction to event time]({{ site.baseurl }}/dev/event_time.html).
+
 * toc
 {:toc}
 
+## Introduction to Watermark Strategies
 
-This section is relevant for programs running on **event time**. For an introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' *timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by accessing/extracting the
-timestamp from some field in the element.
+In order to work with *event time*, Flink needs to know the events'
+*timestamps*, meaning each element in the stream needs to have its event
+timestamp *assigned*. This is usually done by accessing/extracting the
+timestamp from some field in the element by using a `TimestampAssigner`.
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
-progress in event time.
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell
+the system about progress in event time. You can configure this by speciying a

Review comment:
       ```suggestion
   the system about progress in event time. You can configure this by specifying a
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -270,105 +340,115 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent]
 
     val maxTimeLag = 5000L // 5 seconds
 
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-        element.getCreationTime
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        // don't need to do anything because we work on processing time
     }
 
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag
-        new Watermark(System.currentTimeMillis() - maxTimeLag)
+    override def onPeriodicEmit(): Unit = {
+        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-#### **With Punctuated Watermarks**
+### Writing a Punctuated WatermarkGenerator
 
-To generate watermarks whenever a certain event indicates that a new watermark might be generated, use
-`AssignerWithPunctuatedWatermarks`. For this class Flink will first call the `extractTimestamp(...)` method
-to assign the element a timestamp, and then immediately call the
-`checkAndGetNextWatermark(...)` method on that element.
+As mentioned above, a punctuated watermark generator will observe the stream of

Review comment:
       ```suggestion
   A punctuated watermark generator will observe the stream of
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def run(ctx: SourceContext[MyType]): Unit = {
+    while (/* condition */) {
+        val next: MyType = getNext()
+        ctx.collectWithTimestamp(next, next.eventTimestamp)
+
+        if (next.hasWatermarkTime) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
+        }
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## How Operators Are Processing Watermarks
+
+As a general rule, operators are required to completely process a given
+watermark before forwarding it downstream. For example, `WindowOperator` will
+first evaluate which windows should be fired, and only after producing all of

Review comment:
       ```suggestion
   first evaluate all windows that should be fired, and only after producing all of
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}

Review comment:
       Should this use the new source interface? 

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -175,18 +164,103 @@ withTimestampsAndWatermarks
 </div>
 </div>
 
+Using a `WatermarkStrategy` this way takes a stream and produce a new stream
+with timestamped elements and watermarks. If the original stream had timestamps
+and/or watermarks already, the timestamp assigner overwrites them.
+
+## Dealing With Idle Sources
+
+If one of the input splits/partitions/shards does not carry events for a while
+this means that the `WatermarkGenerator` also does not get any new information
+on which to base a watermark. We call this an *idle input* or an *idle source*.
+This is a problem because it can happen that some of your partitions do still
+carry events. In that case, the watermark will be held back, because it is
+computed as the minimum over all the different parallel watermarks.
+
+To deal with this, you can use a `WatermarkStrategy` that will detect idleness and mark an input as idle. `WatermarkStrategies` provides a convenience helper for this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withIdleness(Duration.ofMinutes(1))
+        .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withIdleness(Duration.ofMinutes(1))
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+
+## Writing WatermarkGenerators
+
+A `TimestampAssigner` is a simple function that extracts a field from an event, we therefore don't need to look at them in detail. A `WatermarkGenerator`, on the other hand, is a bit more complicated to write and we will look at how you can do that in the next two sections. This is the `WatermarkGenerator` interface:
+
+{% highlight java %}
+/**
+ * The {@code WatermarkGenerator} generates watermarks either based on events or
+ * periodically (in a fixed interval).
+ *
+ * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
+ * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
+ */
+@Public
+public interface WatermarkGenerator<T> {
+
+    /**
+     * Called for every event, allows the watermark generator to examine and remember the
+     * event timestamps, or to emit a watermark based on the event itself.
+     */
+    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
+
+    /**
+     * Called periodically, and might emit a new watermark, or not.
+     *
+     * <p>The interval in which this method is called and Watermarks are generated
+     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
+     */
+    void onPeriodicEmit(WatermarkOutput output);
+}
+{% endhighlight %}
+
+There are two different styles of watermark generation: *periodic* and
+*punctuated*.
+
+A periodic generator usually observes to the incoming events via `onEvent()`
+and then emits a watermark when the framework calls `onPeriodicEmit()`.
+
+A puncutated generator will look at events in `onEvent()` and wait for special
+*marker events* or *punctuations* that carry watermark information in the
+stream. When it sees one of these events it emits a watermark immediately.
+Usually, punctuated generators don't emit a watermark from `onPeriodicEmit()`.
 
-#### **With Periodic Watermarks**
+We will look at how to implement generators for each style next.
 
-`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
-on the stream elements, or purely based on processing time).
+### Writing a Periodic WatermarkGenerator
 
-The interval (every *n* milliseconds) in which the watermark will be generated is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner's `getCurrentWatermark()` method will be
-called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous
-watermark.
+As mentioned above, a periodic generator observes stream events and generates

Review comment:
       You use the phrase "as mentioned above" alot. 
   ```suggestion
   A periodic generator observes stream events and generates
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-630911127


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1876",
       "triggerID" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e97f18148cb95bca953ace18cb226cbbbab8f00d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e97f18148cb95bca953ace18cb226cbbbab8f00d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c916fded396ee0d7cf9f03adf794c3929d3c3823 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1876) 
   * e97f18148cb95bca953ace18cb226cbbbab8f00d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-632680139


   @aljoscha let’s remove the custom source stuff and the +1 to merge


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-630911127


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c916fded396ee0d7cf9f03adf794c3929d3c3823 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha closed pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
aljoscha closed pull request #12253:
URL: https://github.com/apache/flink/pull/12253


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-632663787


   Thanks for the thorough review!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-632695686


   @sjwiesman do you want to jump on https://issues.apache.org/jira/browse/FLINK-17886


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12253:
URL: https://github.com/apache/flink/pull/12253#discussion_r429213190



##########
File path: docs/dev/event_timestamp_extractors.md
##########
@@ -25,83 +25,80 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the stream.
+As described in [Generating Watermarks]({{ site.baseurl
+}}/dev/event_timestamps_watermarks.html), Flink provides abstractions that
+allow the programmer to assign their own timestamps and emit their own
+watermarks. More specifically, one can do so by implementing the
+`WatermarkGenerator` interface.
 
-In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.
-This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example
-for custom implementations.
+In order to further ease the programming effort for such tasks, Flink comes
+with some pre-implemented timestamp assigners.  This section provides a list of
+them. Apart from their out-of-the-box functionality, their implementation can
+serve as an example for custom implementations.
 
-### **Assigners with ascending timestamps**
+## Monotonously Increasing Timestamps
 
-The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task
-occur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps will
-arrive.
+The simplest special case for *periodic* watermark generation is the case where

Review comment:
       In my defense: this is existing copy that I just reused... πŸ˜…




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12253: [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12253:
URL: https://github.com/apache/flink/pull/12253#issuecomment-630911127


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1876",
       "triggerID" : "c916fded396ee0d7cf9f03adf794c3929d3c3823",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c916fded396ee0d7cf9f03adf794c3929d3c3823 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1876) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org