You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/21 14:41:10 UTC

[GitHub] [flink] alpinegizmo opened a new pull request #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

alpinegizmo opened a new pull request #11845:
URL: https://github.com/apache/flink/pull/11845


   ## What is the purpose of the change
   
   A tutorial for the docs covering the basics of Process Functions and Side Outputs. 
   
   ## Brief change log
   
   Adds docs/tutorials/event_driven.md
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161296189",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9845d84455ca175b046dc1a4e400aa0ec11f7619 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161267645) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878) 
   * e30b1b23a9eb209b9fdac786aed7010577d32e08 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161296189) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9845d84455ca175b046dc1a4e400aa0ec11f7619 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] NicoK commented on a change in pull request #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
NicoK commented on a change in pull request #11845:
URL: https://github.com/apache/flink/pull/11845#discussion_r412425045



##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl

Review comment:
       is this nav-id actually the right one?

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall

Review comment:
       TODO: add links to the exercise and the former page

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a

Review comment:
       ```suggestion
   It is reasonably straightforward, and educational, to do the same thing with a
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 

Review comment:
       ```suggestion
   * There are several types of ProcessFunctions -- this is a `KeyedProcessFunction`, but there are also
     `CoProcessFunctions`, `BroadcastProcessFunctions`, etc. 
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.

Review comment:
       ```suggestion
   * A `KeyedProcessFunction` is a kind of `RichFunction`. Being a `RichFunction`, it has access to the `open`
     and `getRuntimeContext` methods needed for working with managed keyed state.
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:

Review comment:
       ```suggestion
   `KeyedProcessFunction`. Let us begin by replacing the code above with this:
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events

Review comment:
       out-of-order event vs. event being out of order?
   ```suggestion
   Because the fare events can arrive out of order, it will sometimes be necessary to process events
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that

Review comment:
       ```suggestion
   simultaneously, rather than just two. This implementation supports this by using a `MapState` that
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);

Review comment:
       ```suggestion
       Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same

Review comment:
       ```suggestion
   * This example uses a `MapState` where the keys are timestamps, and sets a `Timer` for that same
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).

Review comment:
       you used this spelling a few lines down; I also wouldn't see "TimeWindow" as a Flink term, so I think, this is more appropriate:
   ```suggestion
   produced by the implementation that uses Flink's built-in time windows).
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,

Review comment:
       FYI: I was actually surprised to read (and then see) that these are really only optimized for RocksDB...I thought the MVCC hashmap also extends its reach into the `MapState` on heap, but that doesn't seem to be the case (as you wrote).

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services
+
+Side outputs are a convenient way to do this. 
+
+Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that
+correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the
+same name should have the same type, and will refer to the same side output.
+
+### Example
+
+You are now in a position to do something with the late events that were ignored in the previous
+section. In the `processElement` method of `PseudoWindow` you can now do this:
+
+{% highlight java %}
+if (eventTime <= timerService.currentWatermark()) {
+    // This event is late; its window has already been triggered.
+    OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};

Review comment:
       performance-wise, you should probably not do it that way though and define the output tag outside, once(!)
   
   Since you show both occurrences here, I would recommend showing it correctly.

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services

Review comment:
       All of these are some sort of errors, maybe also mention normal splits in application logic depending on what input type you have or so?

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services
+
+Side outputs are a convenient way to do this. 
+
+Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that
+correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the
+same name should have the same type, and will refer to the same side output.
+
+### Example
+
+You are now in a position to do something with the late events that were ignored in the previous
+section. In the `processElement` method of `PseudoWindow` you can now do this:
+
+{% highlight java %}
+if (eventTime <= timerService.currentWatermark()) {
+    // This event is late; its window has already been triggered.
+    OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
+    ctx.output(lateFares, fare);
+} else {
+    . . .
+}
+{% endhighlight %}
+
+And the job can access this side output:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+SingleOutputStreamOperator hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+
+OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
+hourlyTips.getSideOutput(lateFares).print();
+{% endhighlight %}
+
+{% top %}
+
+## Closing Remarks
+
+In this example you've seen how a ProcessFunction can be used to reimplement a straightforward time

Review comment:
       ```suggestion
   In this example you have seen how a `ProcessFunction` can be used to reimplement a straightforward time
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services
+
+Side outputs are a convenient way to do this. 
+
+Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that
+correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the

Review comment:
       ```suggestion
   correspond to the type of the side output's `DataStream`, and they have names. Two `OutputTag`s with the
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services
+
+Side outputs are a convenient way to do this. 
+
+Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that
+correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the
+same name should have the same type, and will refer to the same side output.
+
+### Example
+
+You are now in a position to do something with the late events that were ignored in the previous
+section. In the `processElement` method of `PseudoWindow` you can now do this:
+
+{% highlight java %}
+if (eventTime <= timerService.currentWatermark()) {
+    // This event is late; its window has already been triggered.
+    OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
+    ctx.output(lateFares, fare);
+} else {
+    . . .
+}
+{% endhighlight %}
+
+And the job can access this side output:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+SingleOutputStreamOperator hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+
+OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
+hourlyTips.getSideOutput(lateFares).print();
+{% endhighlight %}
+
+{% top %}
+
+## Closing Remarks
+
+In this example you've seen how a ProcessFunction can be used to reimplement a straightforward time
+window. Of course, if Flink's built-in windowing API meets your needs, by all means, go ahead and
+use it. But if you find yourself considering doing something contorted with Flink's windows, don't
+be afraid to roll your own.
+
+Also, ProcessFunctions are useful for many other use cases beyond computing analytics. The hands-on
+exercise below provides an example of something completely different.
+
+Another common use case for ProcessFunctions is for expiring stale state. If you think back to the
+[Rides and Fares Exercise](https://github.com/apache/flink-training/tree/master/rides-and-fares),
+where a `RichCoFlatMapFunction` is used to compute a simple join, the sample solution assumes that
+the TaxiRides and TaxiFares are perfectly matched, one-to-one for each rideId. If an event is lost,

Review comment:
       I'm not sure whether to also put `TaxiFare` and `TaxiRide` into backticks since they append an "s" and that usually renders strangely...
   ```suggestion
   the TaxiRides and TaxiFares are perfectly matched, one-to-one for each `rideId`. If an event is lost,
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each

Review comment:
       For a training/hands-on I would actually spell ser/de out, maybe in this form?
   ```suggestion
   state backend can append to `ListState` without going through (de)serialization, and for `MapState`, each
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services
+
+Side outputs are a convenient way to do this. 
+
+Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that
+correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the
+same name should have the same type, and will refer to the same side output.
+
+### Example
+
+You are now in a position to do something with the late events that were ignored in the previous
+section. In the `processElement` method of `PseudoWindow` you can now do this:
+
+{% highlight java %}
+if (eventTime <= timerService.currentWatermark()) {
+    // This event is late; its window has already been triggered.
+    OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
+    ctx.output(lateFares, fare);
+} else {
+    . . .
+}
+{% endhighlight %}
+
+And the job can access this side output:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+SingleOutputStreamOperator hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+
+OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
+hourlyTips.getSideOutput(lateFares).print();
+{% endhighlight %}
+
+{% top %}
+
+## Closing Remarks
+
+In this example you've seen how a ProcessFunction can be used to reimplement a straightforward time
+window. Of course, if Flink's built-in windowing API meets your needs, by all means, go ahead and
+use it. But if you find yourself considering doing something contorted with Flink's windows, don't
+be afraid to roll your own.
+
+Also, ProcessFunctions are useful for many other use cases beyond computing analytics. The hands-on
+exercise below provides an example of something completely different.
+
+Another common use case for ProcessFunctions is for expiring stale state. If you think back to the
+[Rides and Fares Exercise](https://github.com/apache/flink-training/tree/master/rides-and-fares),
+where a `RichCoFlatMapFunction` is used to compute a simple join, the sample solution assumes that
+the TaxiRides and TaxiFares are perfectly matched, one-to-one for each rideId. If an event is lost,
+the other event for the same rideId will be held in state forever. This could instead be implemented

Review comment:
       ```suggestion
   the other event for the same `rideId` will be held in state forever. This could instead be implemented
   ```

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services
+
+Side outputs are a convenient way to do this. 
+
+Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that
+correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the
+same name should have the same type, and will refer to the same side output.
+
+### Example
+
+You are now in a position to do something with the late events that were ignored in the previous
+section. In the `processElement` method of `PseudoWindow` you can now do this:
+
+{% highlight java %}
+if (eventTime <= timerService.currentWatermark()) {
+    // This event is late; its window has already been triggered.
+    OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
+    ctx.output(lateFares, fare);
+} else {
+    . . .
+}
+{% endhighlight %}
+
+And the job can access this side output:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+SingleOutputStreamOperator hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+
+OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
+hourlyTips.getSideOutput(lateFares).print();
+{% endhighlight %}
+
+{% top %}
+
+## Closing Remarks
+
+In this example you've seen how a ProcessFunction can be used to reimplement a straightforward time
+window. Of course, if Flink's built-in windowing API meets your needs, by all means, go ahead and
+use it. But if you find yourself considering doing something contorted with Flink's windows, don't
+be afraid to roll your own.
+
+Also, ProcessFunctions are useful for many other use cases beyond computing analytics. The hands-on

Review comment:
       ```suggestion
   Also, `ProcessFunctions` are useful for many other use cases beyond computing analytics. The hands-on
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] alpinegizmo commented on a change in pull request #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
alpinegizmo commented on a change in pull request #11845:
URL: https://github.com/apache/flink/pull/11845#discussion_r412769939



##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161296189",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161373359",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=44",
       "triggerID" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161384948",
       "triggerID" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161373359) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30) 
   * cc18558988ef82e5f925e7a3ef3d197a87a5ee80 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161384948) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=44) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] alpinegizmo commented on a change in pull request #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
alpinegizmo commented on a change in pull request #11845:
URL: https://github.com/apache/flink/pull/11845#discussion_r412684184



##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl

Review comment:
       good catch!

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services
+
+Side outputs are a convenient way to do this. 
+
+Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that
+correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the
+same name should have the same type, and will refer to the same side output.
+
+### Example
+
+You are now in a position to do something with the late events that were ignored in the previous
+section. In the `processElement` method of `PseudoWindow` you can now do this:
+
+{% highlight java %}
+if (eventTime <= timerService.currentWatermark()) {
+    // This event is late; its window has already been triggered.
+    OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};

Review comment:
       Sigh, I debated this for a while before putting it inside. Problem is, moving it outside requires showing a lot more code, which is distracting, or using a static final OutputTag, which I guess is what I'll do. I've rewritten that section.

##########
File path: docs/tutorials/event_driven.md
##########
@@ -0,0 +1,298 @@
+---
+title: Event-driven Applications
+nav-id: etl
+nav-pos: 6
+nav-title: Event-driven Applications
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Process Functions
+
+### Introduction
+
+A `ProcessFunction` combines event processing with timers and state, making it a powerful building
+block for stream processing applications. This is the basis for creating event-driven applications
+with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers.
+
+### Example
+
+If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall
+that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each
+hour, like this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .timeWindow(Time.hours(1))
+        .process(new AddTips());
+{% endhighlight %}
+
+It's reasonably straightforward, and educational, to do the same thing with a
+`KeyedProcessFunction`. Let's begin by replacing the code above with this:
+
+{% highlight java %}
+// compute the sum of the tips per hour for each driver
+DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
+        .keyBy((TaxiFare fare) -> fare.driverId)
+        .process(new PseudoWindow(Time.hours(1)));
+{% endhighlight %}
+
+In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed
+stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream
+produced by the implementation that uses Flink's built-in TimeWindows).
+
+The overall outline of `PseudoWindow` has this shape:
+
+{% highlight java %}
+// Compute the sum of the tips for each driver in hour-long windows.
+// The keys are driverIds.
+public static class PseudoWindow extends 
+        KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
+
+    private final long durationMsec;
+
+    public PseudoWindow(Time duration) {
+        this.durationMsec = duration.toMilliseconds();
+    }
+
+    @Override
+    // Called once during initialization.
+    public void open(Configuration conf) {
+        . . .
+    }
+
+    @Override
+    // Called as each fare arrives to be processed.
+    public void processElement(
+            TaxiFare fare,
+            Context ctx,
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+
+    @Override
+    // Called when the current watermark indicates that a window is now complete.
+    public void onTimer(long timestamp, 
+            OnTimerContext context, 
+            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+        . . .
+    }
+}
+{% endhighlight %}
+
+Things to be aware of:
+
+* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also
+  CoProcessFunctions, BroadcastProcessFunctions, etc. 
+
+* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open
+  and getRuntimeContext methods needed for working with managed keyed state.
+
+* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called
+  with each incoming event; `onTimer` is called when timers fire. These can be either event time or
+  processing time timers. Both `processElement` and `onTimer` are provided with a context object
+  that can be used to interact with a `TimerService` (among other things). Both callbacks are also
+  passed a `Collector` that can be used to emit results.
+
+#### The `open()` method
+
+{% highlight java %}
+// Keyed, managed state, with an entry for each window, keyed by the window's end time.
+// There is a separate MapState object for each driver.
+private transient MapState<Long, Float> sumOfTips;
+
+@Override
+public void open(Configuration conf) {
+
+    MapStateDescriptor<Long, Float> sumDesc =
+            new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
+    sumOfTips = getRuntimeContext().getMapState(sumDesc);
+}
+{% endhighlight %}
+
+Because the fare events can arrive out-of-order, it will sometimes be necessary to process events
+for one hour before having finished computing the results for the previous hour. In fact, if the
+watermarking delay is much longer than the window length, then there may be many windows open
+simultaneously, rather than just two. This implementation supports this by using `MapState` that
+maps the timestamp for the end of each window to the sum of the tips for that window.
+
+#### The `processElement()` method
+
+{% highlight java %}
+public void processElement(
+        TaxiFare fare,
+        Context ctx,
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long eventTime = fare.getEventTime();
+    TimerService timerService = ctx.timerService();
+
+    if (eventTime <= timerService.currentWatermark()) {
+        // This event is late; its window has already been triggered.
+    } else {
+        // Round up eventTime to the end of the window containing this event.
+        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
+
+        // Schedule a callback for when the window has been completed.
+        timerService.registerEventTimeTimer(endOfWindow);
+
+        // Add this fare's tip to the running total for that window.
+        Float sum = sumOfTips.get(endOfWindow);
+        if (sum == null) {
+            sum = 0.0F;
+        }
+        sum += fare.tip;
+        sumOfTips.put(endOfWindow, sum);
+    }
+}
+{% endhighlight %}
+
+Things to consider:
+
+* What happens with late events? Events that are behind the watermark (i.e., late) are being
+  dropped. If you want to do something better than this, consider using a side output, which is
+  explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md
+  %}#side-outputs).
+
+* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same
+  timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information
+  when the timer fires.
+
+#### The `onTimer()` method
+
+{% highlight java %}
+public void onTimer(
+        long timestamp, 
+        OnTimerContext context, 
+        Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+
+    long driverId = context.getCurrentKey();
+    // Look up the result for the hour that just ended.
+    Float sumOfTips = this.sumOfTips.get(timestamp);
+
+    Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips);
+    out.collect(result);
+    this.sumOfTips.remove(timestamp);
+}
+{% endhighlight %}
+
+Observations:
+
+* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key.
+
+* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at
+  which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`,
+  which has the effect of making it impossible to accommodate late events. This is the equivalent of
+  setting the allowedLateness to zero when working with Flink's time windows.
+
+### Performance Considerations
+
+Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible,
+these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB
+state backend can append to `ListState` without going through ser/de, and for `MapState`, each
+key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated.
+
+{% top %}
+
+## Side Outputs
+
+### Introduction
+
+There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:
+
+* exceptions
+* malformed events
+* late events
+* operational alerts, such as timed-out connections to external services

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617225471


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 9845d84455ca175b046dc1a4e400aa0ec11f7619 (Tue Apr 21 14:44:52 UTC 2020)
   
   **Warnings:**
    * Documentation files were touched, but no `.zh.md` files: Update Chinese documentation or file Jira ticket.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161296189",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161373359",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=44",
       "triggerID" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161384948",
       "triggerID" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cc18558988ef82e5f925e7a3ef3d197a87a5ee80 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161384948) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=44) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161296189",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e30b1b23a9eb209b9fdac786aed7010577d32e08 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161296189) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161296189",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e30b1b23a9eb209b9fdac786aed7010577d32e08 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161296189) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885) 
   * 6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9845d84455ca175b046dc1a4e400aa0ec11f7619 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161267645) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878) 
   * e30b1b23a9eb209b9fdac786aed7010577d32e08 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161296189",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161373359",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e30b1b23a9eb209b9fdac786aed7010577d32e08 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161296189) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885) 
   * 6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161373359) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30) 
   * cc18558988ef82e5f925e7a3ef3d197a87a5ee80 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161296189",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161373359",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=44",
       "triggerID" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161384948",
       "triggerID" : "cc18558988ef82e5f925e7a3ef3d197a87a5ee80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161373359) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30) 
   * cc18558988ef82e5f925e7a3ef3d197a87a5ee80 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161384948) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=44) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9845d84455ca175b046dc1a4e400aa0ec11f7619 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161267645) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11845: [FLINK-17240][docs] Add tutorial on Event-driven Apps

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11845:
URL: https://github.com/apache/flink/pull/11845#issuecomment-617236902


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161267645",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7878",
       "triggerID" : "9845d84455ca175b046dc1a4e400aa0ec11f7619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161296189",
       "triggerID" : "e30b1b23a9eb209b9fdac786aed7010577d32e08",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161373359",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30",
       "triggerID" : "6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e30b1b23a9eb209b9fdac786aed7010577d32e08 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161296189) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7885) 
   * 6c5a02a0608ddcee595d7cad7bfaeacbaf9b46e0 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161373359) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org