You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/21 11:01:44 UTC

[GitHub] [flink] alpinegizmo opened a new pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

alpinegizmo opened a new pull request #11842:
URL: https://github.com/apache/flink/pull/11842


   ## What is the purpose of the change
   
   This pull request adds a tutorial on data pipelines and ETL. It covers map, flatmap, keyby, keyed state, and connected streams.
   
   ## Brief change log
   
   Adds docs/tutorials/etl.md and associated figures.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] alpinegizmo commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
alpinegizmo commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412348966



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 
+
+Our `Deduplicator` class has two methods: `open()` and `flatMap()`. The open method establishes the
+use of managed state by defining a `ValueStateDescriptor<Boolean>`. The arguments to the constructor
+specify a name for this item of keyed state ("keyHasBeenSeen"), and provide information that can be
+used to serialize these objects (in this case, `Types.BOOLEAN`).
+
+{% highlight java %}
+public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
+    ValueState<Boolean> keyHasBeenSeen;
+
+    @Override
+    public void open(Configuration conf) {
+        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
+        keyHasBeenSeen = getRuntimeContext().getState(desc);
+    }
+
+    @Override
+    public void flatMap(Event event, Collector<Event> out) throws Exception {
+        if (keyHasBeenSeen.value() == null) {
+            out.collect(event);
+            keyHasBeenSeen.update(true);
+        }
+    }
+}
+{% endhighlight %}
+
+When the flatMap method calls `keyHasBeenSeen.value()`, Flink's runtime looks up the value of this
+piece of state _for the key in context_, and only if it is `null` does it go ahead and collect the
+event to the output. It also updates `keyHasBeenSeen` to `true` in this case. 
+
+This mechanism for accessing and updating key-partitioned state may seem rather magical, since the
+key is not explicitly visible in the implementation of our `Deduplicator`. When Flink's runtime
+calls the `open` method of our `RichFlatMapFunction`, there is no event, and thus no key in context
+at that moment. But when it calls the `flatMap` method, the key for the event being processed is
+available to the runtime, and is used behind the scenes to determine which entry in Flink's state
+backend is being operated on. 
+
+When deployed to a distributed cluster, there will be many instances of this `Deduplicator`, each of
+which will responsible for a disjoint subset of the entire keyspace. Thus, when you see a single
+item of `ValueState`, such as
+
+{% highlight java %}
+ValueState<Boolean> keyHasBeenSeen;
+{% endhighlight %}
+
+understand that this represents not just a single Boolean, but rather a distributed, sharded, key/value store.
+
+### Clearing State
+
+There's a potential problem with the example above: What will happen if the key space is unbounded?
+Flink is storing somewhere an instance of `Boolean` for every distinct key that is used. If there's
+a bounded set of keys then this will be fine, but in applications where the set of keys is growing
+in an unbounded way, it's necessary to clear the state for keys that are no longer needed. This is
+done by calling `clear()` on the state object, as in:
+
+{% highlight java %}
+keyHasBeenSeen.clear()
+{% endhighlight %}
+
+You might want to do this, for example, after a period of inactivity for a given key. You'll see how
+to use Timers to do this when you learn about `ProcessFunction`s in the section on [event-driven
+applications]({{ site.baseurl }}{% link tutorials/event_driven.md %}#process-functions).
+
+There's also a [State Time-to-Live (TTL)]({{ site.baseurl }}{% link dev/stream/state/state.md
+%}#state-time-to-live-ttl) option that you can configure with the state descriptor that specifies
+when you want the state for stale keys to be automatically cleared.
+
+### Non-keyed State
+
+It is also possible to work with managed state in non-keyed contexts. This is sometimes called
+[operator state]({{ site.baseurl }}{% link dev/stream/state/state.md %}#operator-state). The
+interfaces involved are somewhat different, and since it is unusual for user-defined functions to
+need non-keyed state, it is not covered here. This feature is most often used in the implementation
+of sources and sinks. 
+
+{% top %}
+
+## Connected Streams
+
+Sometimes instead of applying a pre-defined transformation like this:
+
+<img src="{{ site.baseurl }}/fig/transformation.svg" alt="simple transformation" class="offset" width="45%" />
+
+you want to be able to dynamically alter some aspects of the transformation -- by streaming in
+thresholds, or rules, or other parameters. The pattern in Flink that supports this is something
+called _connected streams_, wherein a single operator has two input streams, like this:
+
+<img src="{{ site.baseurl }}/fig/connected-streams.svg" alt="connected streams" class="offset" width="45%" />
+
+Connected streams can also be used to implement streaming joins.
+
+### Example
+
+In this example a control stream is used to specify words which must be filtered out of the
+`streamOfWords`. A `RichCoFlatMapFunction` called `ControlFunction` is applied to the connected
+streams to get this done. 
+
+{% highlight java %}
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
+    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
+  
+    control
+        .connect(datastreamOfWords)
+        .flatMap(new ControlFunction())
+        .print();
+
+    env.execute();
+}
+{% endhighlight %}
+
+Note that the two streams being connected must be keyed in compatible ways -- either both streams
+are not keyed, or both are keyed, and if they are both keyed, the key values have to be the same. In
+this case the streams are both of type `DataStream<String>`, and both streams are keyed by the
+string. As you will see below, this `RichCoFlatMapFunction` is storing a Boolean value in keyed
+state, and this Boolean is shared by the two streams.
+
+{% highlight java %}
+public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
+    private ValueState<Boolean> blocked;
+      
+    @Override
+    public void open(Configuration config) {
+        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
+    }
+      
+    @Override
+    public void flatMap1(String control_value, Collector<String> out) throws Exception {
+        blocked.update(Boolean.TRUE);
+    }
+      
+    @Override
+    public void flatMap2(String data_value, Collector<String> out) throws Exception {
+        if (blocked.value() == null) {
+            out.collect(data_value);
+        }
+    }
+}
+{% endhighlight %}
+
+A `RichCoFlatMapFunction` is a kind of `FlatMapFunction` that can be applied to a pair of connected
+streams, and it has access to the rich function interface. This means that it can be made stateful.
+
+The `blocked` Boolean is being used to remember the keys (words, in this case) that have been
+mentioned on the `control` stream, and those words are being filtered out of the `streamOfWords`
+stream. This is _keyed_ state, and it is shared between the two streams, which is why the two
+streams have to share the same keyspace.
+
+`flatMap1` and `flatMap2` are called by the Flink runtime with elements from each of the two
+connected streams -- in our case, elements from the `control` stream are passed into `flatMap1`, and
+elements from `streamOfWords` are passed into `flatMap2`. This was determined by the order in which
+the two streams are connected with `control.connect(datastreamOfWords)`. 
+
+It is important to recognize that you have no control over the order in which the `flatMap1` and
+`flatMap2` callbacks are called. These two input streams are racing against each other, and the
+Flink runtime will do what it wants to regarding consuming events from one stream or the other. In
+cases where timing and/or ordering matter, you may find it necessary to buffer events in managed
+Flink state until your application is ready to process them. (Note: if you are truly desperate, it
+is possible to exert some limited control over the order in which a two-input operator consumes its
+inputs by using a custom Operator that implements the
+[InputSelectable](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html)

Review comment:
       Right. I think it's site.javadocs_baseurl.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11842:
URL: https://github.com/apache/flink/pull/11842#issuecomment-617121796


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161228600",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49d20891408e825c247a91d668d525927d44c87 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161228600) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859) 
   * eefa417ee0e3ca8248d77c28d42c7bd48bca1306 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] NicoK commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
NicoK commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412865741



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 

Review comment:
       I had something like this in mind, maybe also showing a concrete example with a few system and user keys for `MapState`...
   
   ```
   key -> ValueState<Type>
   key -> ListState<Type>
   key -> MapState<Key, Value>
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11842:
URL: https://github.com/apache/flink/pull/11842#issuecomment-617121796


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161228600",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7882",
       "triggerID" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161289068",
       "triggerID" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "triggerType" : "PUSH"
     }, {
       "hash" : "41e342cbfadb4c60b3c816ba956f26805c5c23e9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "41e342cbfadb4c60b3c816ba956f26805c5c23e9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eefa417ee0e3ca8248d77c28d42c7bd48bca1306 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161289068) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7882) 
   * 41e342cbfadb4c60b3c816ba956f26805c5c23e9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on issue #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11842:
URL: https://github.com/apache/flink/pull/11842#issuecomment-617110043


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit c49d20891408e825c247a91d668d525927d44c87 (Tue Apr 21 11:05:20 UTC 2020)
   
   **Warnings:**
    * Documentation files were touched, but no `.zh.md` files: Update Chinese documentation or file Jira ticket.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11842:
URL: https://github.com/apache/flink/pull/11842#issuecomment-617121796


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161228600",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49d20891408e825c247a91d668d525927d44c87 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161228600) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11842:
URL: https://github.com/apache/flink/pull/11842#issuecomment-617121796


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161228600",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7882",
       "triggerID" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161289068",
       "triggerID" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "triggerType" : "PUSH"
     }, {
       "hash" : "41e342cbfadb4c60b3c816ba956f26805c5c23e9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "41e342cbfadb4c60b3c816ba956f26805c5c23e9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49d20891408e825c247a91d668d525927d44c87 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161228600) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859) 
   * eefa417ee0e3ca8248d77c28d42c7bd48bca1306 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161289068) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7882) 
   * 41e342cbfadb4c60b3c816ba956f26805c5c23e9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] alpinegizmo commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
alpinegizmo commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412369725



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 
+
+Our `Deduplicator` class has two methods: `open()` and `flatMap()`. The open method establishes the
+use of managed state by defining a `ValueStateDescriptor<Boolean>`. The arguments to the constructor
+specify a name for this item of keyed state ("keyHasBeenSeen"), and provide information that can be
+used to serialize these objects (in this case, `Types.BOOLEAN`).
+
+{% highlight java %}
+public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
+    ValueState<Boolean> keyHasBeenSeen;
+
+    @Override
+    public void open(Configuration conf) {
+        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
+        keyHasBeenSeen = getRuntimeContext().getState(desc);
+    }
+
+    @Override
+    public void flatMap(Event event, Collector<Event> out) throws Exception {
+        if (keyHasBeenSeen.value() == null) {
+            out.collect(event);
+            keyHasBeenSeen.update(true);
+        }
+    }
+}
+{% endhighlight %}
+
+When the flatMap method calls `keyHasBeenSeen.value()`, Flink's runtime looks up the value of this
+piece of state _for the key in context_, and only if it is `null` does it go ahead and collect the
+event to the output. It also updates `keyHasBeenSeen` to `true` in this case. 
+
+This mechanism for accessing and updating key-partitioned state may seem rather magical, since the
+key is not explicitly visible in the implementation of our `Deduplicator`. When Flink's runtime
+calls the `open` method of our `RichFlatMapFunction`, there is no event, and thus no key in context
+at that moment. But when it calls the `flatMap` method, the key for the event being processed is
+available to the runtime, and is used behind the scenes to determine which entry in Flink's state
+backend is being operated on. 
+
+When deployed to a distributed cluster, there will be many instances of this `Deduplicator`, each of
+which will responsible for a disjoint subset of the entire keyspace. Thus, when you see a single
+item of `ValueState`, such as
+
+{% highlight java %}
+ValueState<Boolean> keyHasBeenSeen;
+{% endhighlight %}
+
+understand that this represents not just a single Boolean, but rather a distributed, sharded, key/value store.
+
+### Clearing State
+
+There's a potential problem with the example above: What will happen if the key space is unbounded?
+Flink is storing somewhere an instance of `Boolean` for every distinct key that is used. If there's
+a bounded set of keys then this will be fine, but in applications where the set of keys is growing
+in an unbounded way, it's necessary to clear the state for keys that are no longer needed. This is
+done by calling `clear()` on the state object, as in:
+
+{% highlight java %}
+keyHasBeenSeen.clear()
+{% endhighlight %}
+
+You might want to do this, for example, after a period of inactivity for a given key. You'll see how
+to use Timers to do this when you learn about `ProcessFunction`s in the section on [event-driven
+applications]({{ site.baseurl }}{% link tutorials/event_driven.md %}#process-functions).
+
+There's also a [State Time-to-Live (TTL)]({{ site.baseurl }}{% link dev/stream/state/state.md
+%}#state-time-to-live-ttl) option that you can configure with the state descriptor that specifies
+when you want the state for stale keys to be automatically cleared.
+
+### Non-keyed State
+
+It is also possible to work with managed state in non-keyed contexts. This is sometimes called
+[operator state]({{ site.baseurl }}{% link dev/stream/state/state.md %}#operator-state). The
+interfaces involved are somewhat different, and since it is unusual for user-defined functions to
+need non-keyed state, it is not covered here. This feature is most often used in the implementation
+of sources and sinks. 
+
+{% top %}
+
+## Connected Streams
+
+Sometimes instead of applying a pre-defined transformation like this:
+
+<img src="{{ site.baseurl }}/fig/transformation.svg" alt="simple transformation" class="offset" width="45%" />
+
+you want to be able to dynamically alter some aspects of the transformation -- by streaming in
+thresholds, or rules, or other parameters. The pattern in Flink that supports this is something
+called _connected streams_, wherein a single operator has two input streams, like this:
+
+<img src="{{ site.baseurl }}/fig/connected-streams.svg" alt="connected streams" class="offset" width="45%" />
+
+Connected streams can also be used to implement streaming joins.
+
+### Example
+
+In this example a control stream is used to specify words which must be filtered out of the
+`streamOfWords`. A `RichCoFlatMapFunction` called `ControlFunction` is applied to the connected
+streams to get this done. 
+
+{% highlight java %}
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
+    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
+  
+    control
+        .connect(datastreamOfWords)
+        .flatMap(new ControlFunction())
+        .print();
+
+    env.execute();
+}
+{% endhighlight %}
+
+Note that the two streams being connected must be keyed in compatible ways -- either both streams
+are not keyed, or both are keyed, and if they are both keyed, the key values have to be the same. In
+this case the streams are both of type `DataStream<String>`, and both streams are keyed by the
+string. As you will see below, this `RichCoFlatMapFunction` is storing a Boolean value in keyed
+state, and this Boolean is shared by the two streams.
+
+{% highlight java %}
+public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
+    private ValueState<Boolean> blocked;
+      
+    @Override
+    public void open(Configuration config) {
+        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
+    }
+      
+    @Override
+    public void flatMap1(String control_value, Collector<String> out) throws Exception {
+        blocked.update(Boolean.TRUE);
+    }
+      
+    @Override
+    public void flatMap2(String data_value, Collector<String> out) throws Exception {
+        if (blocked.value() == null) {
+            out.collect(data_value);
+        }
+    }
+}
+{% endhighlight %}
+
+A `RichCoFlatMapFunction` is a kind of `FlatMapFunction` that can be applied to a pair of connected
+streams, and it has access to the rich function interface. This means that it can be made stateful.
+
+The `blocked` Boolean is being used to remember the keys (words, in this case) that have been
+mentioned on the `control` stream, and those words are being filtered out of the `streamOfWords`
+stream. This is _keyed_ state, and it is shared between the two streams, which is why the two
+streams have to share the same keyspace.
+
+`flatMap1` and `flatMap2` are called by the Flink runtime with elements from each of the two
+connected streams -- in our case, elements from the `control` stream are passed into `flatMap1`, and
+elements from `streamOfWords` are passed into `flatMap2`. This was determined by the order in which
+the two streams are connected with `control.connect(datastreamOfWords)`. 
+
+It is important to recognize that you have no control over the order in which the `flatMap1` and
+`flatMap2` callbacks are called. These two input streams are racing against each other, and the
+Flink runtime will do what it wants to regarding consuming events from one stream or the other. In
+cases where timing and/or ordering matter, you may find it necessary to buffer events in managed
+Flink state until your application is ready to process them. (Note: if you are truly desperate, it
+is possible to exert some limited control over the order in which a two-input operator consumes its
+inputs by using a custom Operator that implements the
+[InputSelectable](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html)
+interface.)
+
+{% top %}
+
+## Hands-on
+
+The hands-on exercise that goes with this section is the [Rides and Fares
+Exercise](https://github.com/apache/flink-training/tree/master/rides-and-fares).

Review comment:
       Ok, I figured out how to do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on issue #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11842:
URL: https://github.com/apache/flink/pull/11842#issuecomment-617121796


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49d20891408e825c247a91d668d525927d44c87 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] alpinegizmo commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
alpinegizmo commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412312887



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).

Review comment:
       Good idea. Done.

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())

Review comment:
       I chose RideCleansingSolution.NYCFilter.

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />

Review comment:
       I already looked. Couldn't find one.

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);

Review comment:
       Good point. Done.

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct

Review comment:
       I'm taking all of your suggestions, but not using the suggestion feature, just in case Github gets difficult again.

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored

Review comment:
       I've done something similar

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?

Review comment:
       For the sections whose names are (or begin with) method names, I think it would be inappropriate to capitalize them. If you prefer, I'd be okay changing `map()` to Map, etc., but not `Map()`.

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API

Review comment:
       Right.

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 

Review comment:
       In principle I like the idea of having a figure for this, I don't have a visualization for this is mind. 

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 
+
+Our `Deduplicator` class has two methods: `open()` and `flatMap()`. The open method establishes the
+use of managed state by defining a `ValueStateDescriptor<Boolean>`. The arguments to the constructor
+specify a name for this item of keyed state ("keyHasBeenSeen"), and provide information that can be
+used to serialize these objects (in this case, `Types.BOOLEAN`).
+
+{% highlight java %}
+public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
+    ValueState<Boolean> keyHasBeenSeen;
+
+    @Override
+    public void open(Configuration conf) {
+        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
+        keyHasBeenSeen = getRuntimeContext().getState(desc);
+    }
+
+    @Override
+    public void flatMap(Event event, Collector<Event> out) throws Exception {
+        if (keyHasBeenSeen.value() == null) {
+            out.collect(event);
+            keyHasBeenSeen.update(true);
+        }
+    }
+}
+{% endhighlight %}
+
+When the flatMap method calls `keyHasBeenSeen.value()`, Flink's runtime looks up the value of this
+piece of state _for the key in context_, and only if it is `null` does it go ahead and collect the
+event to the output. It also updates `keyHasBeenSeen` to `true` in this case. 
+
+This mechanism for accessing and updating key-partitioned state may seem rather magical, since the
+key is not explicitly visible in the implementation of our `Deduplicator`. When Flink's runtime
+calls the `open` method of our `RichFlatMapFunction`, there is no event, and thus no key in context
+at that moment. But when it calls the `flatMap` method, the key for the event being processed is
+available to the runtime, and is used behind the scenes to determine which entry in Flink's state
+backend is being operated on. 
+
+When deployed to a distributed cluster, there will be many instances of this `Deduplicator`, each of
+which will responsible for a disjoint subset of the entire keyspace. Thus, when you see a single
+item of `ValueState`, such as
+
+{% highlight java %}
+ValueState<Boolean> keyHasBeenSeen;
+{% endhighlight %}
+
+understand that this represents not just a single Boolean, but rather a distributed, sharded, key/value store.
+
+### Clearing State
+
+There's a potential problem with the example above: What will happen if the key space is unbounded?
+Flink is storing somewhere an instance of `Boolean` for every distinct key that is used. If there's
+a bounded set of keys then this will be fine, but in applications where the set of keys is growing
+in an unbounded way, it's necessary to clear the state for keys that are no longer needed. This is
+done by calling `clear()` on the state object, as in:
+
+{% highlight java %}
+keyHasBeenSeen.clear()
+{% endhighlight %}
+
+You might want to do this, for example, after a period of inactivity for a given key. You'll see how
+to use Timers to do this when you learn about `ProcessFunction`s in the section on [event-driven
+applications]({{ site.baseurl }}{% link tutorials/event_driven.md %}#process-functions).
+
+There's also a [State Time-to-Live (TTL)]({{ site.baseurl }}{% link dev/stream/state/state.md
+%}#state-time-to-live-ttl) option that you can configure with the state descriptor that specifies
+when you want the state for stale keys to be automatically cleared.
+
+### Non-keyed State
+
+It is also possible to work with managed state in non-keyed contexts. This is sometimes called
+[operator state]({{ site.baseurl }}{% link dev/stream/state/state.md %}#operator-state). The
+interfaces involved are somewhat different, and since it is unusual for user-defined functions to
+need non-keyed state, it is not covered here. This feature is most often used in the implementation
+of sources and sinks. 
+
+{% top %}
+
+## Connected Streams
+
+Sometimes instead of applying a pre-defined transformation like this:
+
+<img src="{{ site.baseurl }}/fig/transformation.svg" alt="simple transformation" class="offset" width="45%" />
+
+you want to be able to dynamically alter some aspects of the transformation -- by streaming in
+thresholds, or rules, or other parameters. The pattern in Flink that supports this is something
+called _connected streams_, wherein a single operator has two input streams, like this:
+
+<img src="{{ site.baseurl }}/fig/connected-streams.svg" alt="connected streams" class="offset" width="45%" />
+
+Connected streams can also be used to implement streaming joins.
+
+### Example
+
+In this example a control stream is used to specify words which must be filtered out of the
+`streamOfWords`. A `RichCoFlatMapFunction` called `ControlFunction` is applied to the connected
+streams to get this done. 
+
+{% highlight java %}
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
+    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
+  
+    control
+        .connect(datastreamOfWords)
+        .flatMap(new ControlFunction())
+        .print();
+
+    env.execute();
+}
+{% endhighlight %}
+
+Note that the two streams being connected must be keyed in compatible ways -- either both streams
+are not keyed, or both are keyed, and if they are both keyed, the key values have to be the same. In
+this case the streams are both of type `DataStream<String>`, and both streams are keyed by the
+string. As you will see below, this `RichCoFlatMapFunction` is storing a Boolean value in keyed
+state, and this Boolean is shared by the two streams.
+
+{% highlight java %}
+public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
+    private ValueState<Boolean> blocked;
+      
+    @Override
+    public void open(Configuration config) {
+        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
+    }
+      
+    @Override
+    public void flatMap1(String control_value, Collector<String> out) throws Exception {
+        blocked.update(Boolean.TRUE);
+    }
+      
+    @Override
+    public void flatMap2(String data_value, Collector<String> out) throws Exception {
+        if (blocked.value() == null) {
+            out.collect(data_value);
+        }
+    }
+}
+{% endhighlight %}
+
+A `RichCoFlatMapFunction` is a kind of `FlatMapFunction` that can be applied to a pair of connected
+streams, and it has access to the rich function interface. This means that it can be made stateful.
+
+The `blocked` Boolean is being used to remember the keys (words, in this case) that have been
+mentioned on the `control` stream, and those words are being filtered out of the `streamOfWords`
+stream. This is _keyed_ state, and it is shared between the two streams, which is why the two
+streams have to share the same keyspace.
+
+`flatMap1` and `flatMap2` are called by the Flink runtime with elements from each of the two
+connected streams -- in our case, elements from the `control` stream are passed into `flatMap1`, and
+elements from `streamOfWords` are passed into `flatMap2`. This was determined by the order in which
+the two streams are connected with `control.connect(datastreamOfWords)`. 
+
+It is important to recognize that you have no control over the order in which the `flatMap1` and
+`flatMap2` callbacks are called. These two input streams are racing against each other, and the
+Flink runtime will do what it wants to regarding consuming events from one stream or the other. In
+cases where timing and/or ordering matter, you may find it necessary to buffer events in managed
+Flink state until your application is ready to process them. (Note: if you are truly desperate, it
+is possible to exert some limited control over the order in which a two-input operator consumes its
+inputs by using a custom Operator that implements the
+[InputSelectable](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html)
+interface.)
+
+{% top %}
+
+## Hands-on
+
+The hands-on exercise that goes with this section is the [Rides and Fares
+Exercise](https://github.com/apache/flink-training/tree/master/rides-and-fares).
+
+{% top %}
+
+## Further Reading
+
+- [DataStream Transformations]({{ site.baseurl }}{% link dev/stream/operators/index.md %}#datastream-transformations)
+- [Stateful Stream Processing]({{ site.baseurl }}{% link concepts/stateful-stream-processing.md %})

Review comment:
       Let's use that link later; this is too early in the tutorials.

##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 
+
+Our `Deduplicator` class has two methods: `open()` and `flatMap()`. The open method establishes the
+use of managed state by defining a `ValueStateDescriptor<Boolean>`. The arguments to the constructor
+specify a name for this item of keyed state ("keyHasBeenSeen"), and provide information that can be
+used to serialize these objects (in this case, `Types.BOOLEAN`).
+
+{% highlight java %}
+public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
+    ValueState<Boolean> keyHasBeenSeen;
+
+    @Override
+    public void open(Configuration conf) {
+        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
+        keyHasBeenSeen = getRuntimeContext().getState(desc);
+    }
+
+    @Override
+    public void flatMap(Event event, Collector<Event> out) throws Exception {
+        if (keyHasBeenSeen.value() == null) {
+            out.collect(event);
+            keyHasBeenSeen.update(true);
+        }
+    }
+}
+{% endhighlight %}
+
+When the flatMap method calls `keyHasBeenSeen.value()`, Flink's runtime looks up the value of this
+piece of state _for the key in context_, and only if it is `null` does it go ahead and collect the
+event to the output. It also updates `keyHasBeenSeen` to `true` in this case. 
+
+This mechanism for accessing and updating key-partitioned state may seem rather magical, since the
+key is not explicitly visible in the implementation of our `Deduplicator`. When Flink's runtime
+calls the `open` method of our `RichFlatMapFunction`, there is no event, and thus no key in context
+at that moment. But when it calls the `flatMap` method, the key for the event being processed is
+available to the runtime, and is used behind the scenes to determine which entry in Flink's state
+backend is being operated on. 
+
+When deployed to a distributed cluster, there will be many instances of this `Deduplicator`, each of
+which will responsible for a disjoint subset of the entire keyspace. Thus, when you see a single
+item of `ValueState`, such as
+
+{% highlight java %}
+ValueState<Boolean> keyHasBeenSeen;
+{% endhighlight %}
+
+understand that this represents not just a single Boolean, but rather a distributed, sharded, key/value store.
+
+### Clearing State
+
+There's a potential problem with the example above: What will happen if the key space is unbounded?
+Flink is storing somewhere an instance of `Boolean` for every distinct key that is used. If there's
+a bounded set of keys then this will be fine, but in applications where the set of keys is growing
+in an unbounded way, it's necessary to clear the state for keys that are no longer needed. This is
+done by calling `clear()` on the state object, as in:
+
+{% highlight java %}
+keyHasBeenSeen.clear()
+{% endhighlight %}
+
+You might want to do this, for example, after a period of inactivity for a given key. You'll see how
+to use Timers to do this when you learn about `ProcessFunction`s in the section on [event-driven
+applications]({{ site.baseurl }}{% link tutorials/event_driven.md %}#process-functions).
+
+There's also a [State Time-to-Live (TTL)]({{ site.baseurl }}{% link dev/stream/state/state.md
+%}#state-time-to-live-ttl) option that you can configure with the state descriptor that specifies
+when you want the state for stale keys to be automatically cleared.
+
+### Non-keyed State
+
+It is also possible to work with managed state in non-keyed contexts. This is sometimes called
+[operator state]({{ site.baseurl }}{% link dev/stream/state/state.md %}#operator-state). The
+interfaces involved are somewhat different, and since it is unusual for user-defined functions to
+need non-keyed state, it is not covered here. This feature is most often used in the implementation
+of sources and sinks. 
+
+{% top %}
+
+## Connected Streams
+
+Sometimes instead of applying a pre-defined transformation like this:
+
+<img src="{{ site.baseurl }}/fig/transformation.svg" alt="simple transformation" class="offset" width="45%" />
+
+you want to be able to dynamically alter some aspects of the transformation -- by streaming in
+thresholds, or rules, or other parameters. The pattern in Flink that supports this is something
+called _connected streams_, wherein a single operator has two input streams, like this:
+
+<img src="{{ site.baseurl }}/fig/connected-streams.svg" alt="connected streams" class="offset" width="45%" />
+
+Connected streams can also be used to implement streaming joins.
+
+### Example
+
+In this example a control stream is used to specify words which must be filtered out of the
+`streamOfWords`. A `RichCoFlatMapFunction` called `ControlFunction` is applied to the connected
+streams to get this done. 
+
+{% highlight java %}
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
+    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
+  
+    control
+        .connect(datastreamOfWords)
+        .flatMap(new ControlFunction())
+        .print();
+
+    env.execute();
+}
+{% endhighlight %}
+
+Note that the two streams being connected must be keyed in compatible ways -- either both streams
+are not keyed, or both are keyed, and if they are both keyed, the key values have to be the same. In

Review comment:
       The non-keyed case strikes me as being weird, and not worth bringing up at this point in the docs. I've removed it, and expanded the explanation for the keyed case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] NicoK commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
NicoK commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412303493



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 
+
+Our `Deduplicator` class has two methods: `open()` and `flatMap()`. The open method establishes the
+use of managed state by defining a `ValueStateDescriptor<Boolean>`. The arguments to the constructor
+specify a name for this item of keyed state ("keyHasBeenSeen"), and provide information that can be
+used to serialize these objects (in this case, `Types.BOOLEAN`).
+
+{% highlight java %}
+public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
+    ValueState<Boolean> keyHasBeenSeen;
+
+    @Override
+    public void open(Configuration conf) {
+        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
+        keyHasBeenSeen = getRuntimeContext().getState(desc);
+    }
+
+    @Override
+    public void flatMap(Event event, Collector<Event> out) throws Exception {
+        if (keyHasBeenSeen.value() == null) {
+            out.collect(event);
+            keyHasBeenSeen.update(true);
+        }
+    }
+}
+{% endhighlight %}
+
+When the flatMap method calls `keyHasBeenSeen.value()`, Flink's runtime looks up the value of this
+piece of state _for the key in context_, and only if it is `null` does it go ahead and collect the
+event to the output. It also updates `keyHasBeenSeen` to `true` in this case. 
+
+This mechanism for accessing and updating key-partitioned state may seem rather magical, since the
+key is not explicitly visible in the implementation of our `Deduplicator`. When Flink's runtime
+calls the `open` method of our `RichFlatMapFunction`, there is no event, and thus no key in context
+at that moment. But when it calls the `flatMap` method, the key for the event being processed is
+available to the runtime, and is used behind the scenes to determine which entry in Flink's state
+backend is being operated on. 
+
+When deployed to a distributed cluster, there will be many instances of this `Deduplicator`, each of
+which will responsible for a disjoint subset of the entire keyspace. Thus, when you see a single
+item of `ValueState`, such as
+
+{% highlight java %}
+ValueState<Boolean> keyHasBeenSeen;
+{% endhighlight %}
+
+understand that this represents not just a single Boolean, but rather a distributed, sharded, key/value store.
+
+### Clearing State
+
+There's a potential problem with the example above: What will happen if the key space is unbounded?
+Flink is storing somewhere an instance of `Boolean` for every distinct key that is used. If there's
+a bounded set of keys then this will be fine, but in applications where the set of keys is growing
+in an unbounded way, it's necessary to clear the state for keys that are no longer needed. This is
+done by calling `clear()` on the state object, as in:
+
+{% highlight java %}
+keyHasBeenSeen.clear()
+{% endhighlight %}
+
+You might want to do this, for example, after a period of inactivity for a given key. You'll see how
+to use Timers to do this when you learn about `ProcessFunction`s in the section on [event-driven
+applications]({{ site.baseurl }}{% link tutorials/event_driven.md %}#process-functions).
+
+There's also a [State Time-to-Live (TTL)]({{ site.baseurl }}{% link dev/stream/state/state.md
+%}#state-time-to-live-ttl) option that you can configure with the state descriptor that specifies
+when you want the state for stale keys to be automatically cleared.
+
+### Non-keyed State
+
+It is also possible to work with managed state in non-keyed contexts. This is sometimes called
+[operator state]({{ site.baseurl }}{% link dev/stream/state/state.md %}#operator-state). The
+interfaces involved are somewhat different, and since it is unusual for user-defined functions to
+need non-keyed state, it is not covered here. This feature is most often used in the implementation
+of sources and sinks. 
+
+{% top %}
+
+## Connected Streams
+
+Sometimes instead of applying a pre-defined transformation like this:
+
+<img src="{{ site.baseurl }}/fig/transformation.svg" alt="simple transformation" class="offset" width="45%" />
+
+you want to be able to dynamically alter some aspects of the transformation -- by streaming in
+thresholds, or rules, or other parameters. The pattern in Flink that supports this is something
+called _connected streams_, wherein a single operator has two input streams, like this:
+
+<img src="{{ site.baseurl }}/fig/connected-streams.svg" alt="connected streams" class="offset" width="45%" />
+
+Connected streams can also be used to implement streaming joins.
+
+### Example
+
+In this example a control stream is used to specify words which must be filtered out of the
+`streamOfWords`. A `RichCoFlatMapFunction` called `ControlFunction` is applied to the connected
+streams to get this done. 
+
+{% highlight java %}
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
+    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
+  
+    control
+        .connect(datastreamOfWords)
+        .flatMap(new ControlFunction())
+        .print();
+
+    env.execute();
+}
+{% endhighlight %}
+
+Note that the two streams being connected must be keyed in compatible ways -- either both streams
+are not keyed, or both are keyed, and if they are both keyed, the key values have to be the same. In

Review comment:
       This applies similarly to non-keyed streams though; not sure what to do about that...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] NicoK commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
NicoK commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412865741



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 

Review comment:
       I had something like this in mind, maybe also showing a concrete example with a few system and user keys for `MapState`...
   
   ```
   key -> ValueState<Type>
   key -> ListState<Type>
   key -> MapState<Key, Value>
   ```
   
   Let's do this as an extension (not in this PR)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] NicoK commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
NicoK commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412343632



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 
+
+Our `Deduplicator` class has two methods: `open()` and `flatMap()`. The open method establishes the
+use of managed state by defining a `ValueStateDescriptor<Boolean>`. The arguments to the constructor
+specify a name for this item of keyed state ("keyHasBeenSeen"), and provide information that can be
+used to serialize these objects (in this case, `Types.BOOLEAN`).
+
+{% highlight java %}
+public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
+    ValueState<Boolean> keyHasBeenSeen;
+
+    @Override
+    public void open(Configuration conf) {
+        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
+        keyHasBeenSeen = getRuntimeContext().getState(desc);
+    }
+
+    @Override
+    public void flatMap(Event event, Collector<Event> out) throws Exception {
+        if (keyHasBeenSeen.value() == null) {
+            out.collect(event);
+            keyHasBeenSeen.update(true);
+        }
+    }
+}
+{% endhighlight %}
+
+When the flatMap method calls `keyHasBeenSeen.value()`, Flink's runtime looks up the value of this
+piece of state _for the key in context_, and only if it is `null` does it go ahead and collect the
+event to the output. It also updates `keyHasBeenSeen` to `true` in this case. 
+
+This mechanism for accessing and updating key-partitioned state may seem rather magical, since the
+key is not explicitly visible in the implementation of our `Deduplicator`. When Flink's runtime
+calls the `open` method of our `RichFlatMapFunction`, there is no event, and thus no key in context
+at that moment. But when it calls the `flatMap` method, the key for the event being processed is
+available to the runtime, and is used behind the scenes to determine which entry in Flink's state
+backend is being operated on. 
+
+When deployed to a distributed cluster, there will be many instances of this `Deduplicator`, each of
+which will responsible for a disjoint subset of the entire keyspace. Thus, when you see a single
+item of `ValueState`, such as
+
+{% highlight java %}
+ValueState<Boolean> keyHasBeenSeen;
+{% endhighlight %}
+
+understand that this represents not just a single Boolean, but rather a distributed, sharded, key/value store.
+
+### Clearing State
+
+There's a potential problem with the example above: What will happen if the key space is unbounded?
+Flink is storing somewhere an instance of `Boolean` for every distinct key that is used. If there's
+a bounded set of keys then this will be fine, but in applications where the set of keys is growing
+in an unbounded way, it's necessary to clear the state for keys that are no longer needed. This is
+done by calling `clear()` on the state object, as in:
+
+{% highlight java %}
+keyHasBeenSeen.clear()
+{% endhighlight %}
+
+You might want to do this, for example, after a period of inactivity for a given key. You'll see how
+to use Timers to do this when you learn about `ProcessFunction`s in the section on [event-driven
+applications]({{ site.baseurl }}{% link tutorials/event_driven.md %}#process-functions).
+
+There's also a [State Time-to-Live (TTL)]({{ site.baseurl }}{% link dev/stream/state/state.md
+%}#state-time-to-live-ttl) option that you can configure with the state descriptor that specifies
+when you want the state for stale keys to be automatically cleared.
+
+### Non-keyed State
+
+It is also possible to work with managed state in non-keyed contexts. This is sometimes called
+[operator state]({{ site.baseurl }}{% link dev/stream/state/state.md %}#operator-state). The
+interfaces involved are somewhat different, and since it is unusual for user-defined functions to
+need non-keyed state, it is not covered here. This feature is most often used in the implementation
+of sources and sinks. 
+
+{% top %}
+
+## Connected Streams
+
+Sometimes instead of applying a pre-defined transformation like this:
+
+<img src="{{ site.baseurl }}/fig/transformation.svg" alt="simple transformation" class="offset" width="45%" />
+
+you want to be able to dynamically alter some aspects of the transformation -- by streaming in
+thresholds, or rules, or other parameters. The pattern in Flink that supports this is something
+called _connected streams_, wherein a single operator has two input streams, like this:
+
+<img src="{{ site.baseurl }}/fig/connected-streams.svg" alt="connected streams" class="offset" width="45%" />
+
+Connected streams can also be used to implement streaming joins.
+
+### Example
+
+In this example a control stream is used to specify words which must be filtered out of the
+`streamOfWords`. A `RichCoFlatMapFunction` called `ControlFunction` is applied to the connected
+streams to get this done. 
+
+{% highlight java %}
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
+    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
+  
+    control
+        .connect(datastreamOfWords)
+        .flatMap(new ControlFunction())
+        .print();
+
+    env.execute();
+}
+{% endhighlight %}
+
+Note that the two streams being connected must be keyed in compatible ways -- either both streams
+are not keyed, or both are keyed, and if they are both keyed, the key values have to be the same. In
+this case the streams are both of type `DataStream<String>`, and both streams are keyed by the
+string. As you will see below, this `RichCoFlatMapFunction` is storing a Boolean value in keyed
+state, and this Boolean is shared by the two streams.
+
+{% highlight java %}
+public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
+    private ValueState<Boolean> blocked;
+      
+    @Override
+    public void open(Configuration config) {
+        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
+    }
+      
+    @Override
+    public void flatMap1(String control_value, Collector<String> out) throws Exception {
+        blocked.update(Boolean.TRUE);
+    }
+      
+    @Override
+    public void flatMap2(String data_value, Collector<String> out) throws Exception {
+        if (blocked.value() == null) {
+            out.collect(data_value);
+        }
+    }
+}
+{% endhighlight %}
+
+A `RichCoFlatMapFunction` is a kind of `FlatMapFunction` that can be applied to a pair of connected
+streams, and it has access to the rich function interface. This means that it can be made stateful.
+
+The `blocked` Boolean is being used to remember the keys (words, in this case) that have been
+mentioned on the `control` stream, and those words are being filtered out of the `streamOfWords`
+stream. This is _keyed_ state, and it is shared between the two streams, which is why the two
+streams have to share the same keyspace.
+
+`flatMap1` and `flatMap2` are called by the Flink runtime with elements from each of the two
+connected streams -- in our case, elements from the `control` stream are passed into `flatMap1`, and
+elements from `streamOfWords` are passed into `flatMap2`. This was determined by the order in which
+the two streams are connected with `control.connect(datastreamOfWords)`. 
+
+It is important to recognize that you have no control over the order in which the `flatMap1` and
+`flatMap2` callbacks are called. These two input streams are racing against each other, and the
+Flink runtime will do what it wants to regarding consuming events from one stream or the other. In
+cases where timing and/or ordering matter, you may find it necessary to buffer events in managed
+Flink state until your application is ready to process them. (Note: if you are truly desperate, it
+is possible to exert some limited control over the order in which a two-input operator consumes its
+inputs by using a custom Operator that implements the
+[InputSelectable](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html)

Review comment:
       shouldn't this link be dynamic with the version, e.g. relying on `{{ site.baseurl }}` or so?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] NicoK commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
NicoK commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412345152



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?
+
+Your applications are certainly capable of using state without getting Flink involved in managing it
+-- but Flink offers some compelling features for the state it manages:
+
+* local: Flink state is kept local to the machine that processes it, and can be accessed at memory speed
+* durable: Flink state is automatically checkpointed and restored
+* vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk
+* horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
+* queryable: Flink state can be queried via a REST API
+
+In this section you will learn how to work with Flink's APIs that manage keyed state.
+
+### Rich Functions
+
+At this point you've already seen several of Flink's function interfaces, including
+`FilterFunction`, `MapFunction`, and `FlatMapFunction`. These are all examples of the Single
+Abstract Method pattern.
+
+For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
+`RichFlatMapFunction`, which has some additional methods, including:
+
+- `open(Configuration c)`
+- `close()`
+- `getRuntimeContext()`
+
+`open()` is called once, during operator initialization. This is an opportunity to load some static
+data, or to open a connection to an external service, for example.
+
+`getRuntimeContext()` provides access to a whole suite of potentially interesting things, but most
+notably it is how you can create and access state managed by Flink.
+
+### An Example with Keyed State
+
+In this example, imagine you have a stream of events that you want to de-duplicate, so that you only
+keep the first event with each key. Here's an application that does that, using a
+`RichFlatMapFunction` called `Deduplicator`:
+
+{% highlight java %}
+private static class Event {
+    public final String key;
+    public final long timestamp;
+    ...
+}
+
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+    env.addSource(new EventSource())
+        .keyBy(e -> e.key)
+        .flatMap(new Deduplicator())
+        .print();
+  
+    env.execute();
+}
+{% endhighlight %}
+
+To accomplish this, `Deduplicator` will need to somehow remember, for each key, whether or not there
+has already been an event for that key. It will do using Flink's _keyed state_ interface.
+
+When you are working with a keyed stream like this one, Flink will maintain a key/value store for
+each item of state being managed.
+
+Flink supports several different types of keyed state, and this example uses the simplest one,
+namely `ValueState`. This means that _for each key_, Flink will store a single object -- in this
+case, an object of type `Boolean`. 
+
+Our `Deduplicator` class has two methods: `open()` and `flatMap()`. The open method establishes the
+use of managed state by defining a `ValueStateDescriptor<Boolean>`. The arguments to the constructor
+specify a name for this item of keyed state ("keyHasBeenSeen"), and provide information that can be
+used to serialize these objects (in this case, `Types.BOOLEAN`).
+
+{% highlight java %}
+public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
+    ValueState<Boolean> keyHasBeenSeen;
+
+    @Override
+    public void open(Configuration conf) {
+        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
+        keyHasBeenSeen = getRuntimeContext().getState(desc);
+    }
+
+    @Override
+    public void flatMap(Event event, Collector<Event> out) throws Exception {
+        if (keyHasBeenSeen.value() == null) {
+            out.collect(event);
+            keyHasBeenSeen.update(true);
+        }
+    }
+}
+{% endhighlight %}
+
+When the flatMap method calls `keyHasBeenSeen.value()`, Flink's runtime looks up the value of this
+piece of state _for the key in context_, and only if it is `null` does it go ahead and collect the
+event to the output. It also updates `keyHasBeenSeen` to `true` in this case. 
+
+This mechanism for accessing and updating key-partitioned state may seem rather magical, since the
+key is not explicitly visible in the implementation of our `Deduplicator`. When Flink's runtime
+calls the `open` method of our `RichFlatMapFunction`, there is no event, and thus no key in context
+at that moment. But when it calls the `flatMap` method, the key for the event being processed is
+available to the runtime, and is used behind the scenes to determine which entry in Flink's state
+backend is being operated on. 
+
+When deployed to a distributed cluster, there will be many instances of this `Deduplicator`, each of
+which will responsible for a disjoint subset of the entire keyspace. Thus, when you see a single
+item of `ValueState`, such as
+
+{% highlight java %}
+ValueState<Boolean> keyHasBeenSeen;
+{% endhighlight %}
+
+understand that this represents not just a single Boolean, but rather a distributed, sharded, key/value store.
+
+### Clearing State
+
+There's a potential problem with the example above: What will happen if the key space is unbounded?
+Flink is storing somewhere an instance of `Boolean` for every distinct key that is used. If there's
+a bounded set of keys then this will be fine, but in applications where the set of keys is growing
+in an unbounded way, it's necessary to clear the state for keys that are no longer needed. This is
+done by calling `clear()` on the state object, as in:
+
+{% highlight java %}
+keyHasBeenSeen.clear()
+{% endhighlight %}
+
+You might want to do this, for example, after a period of inactivity for a given key. You'll see how
+to use Timers to do this when you learn about `ProcessFunction`s in the section on [event-driven
+applications]({{ site.baseurl }}{% link tutorials/event_driven.md %}#process-functions).
+
+There's also a [State Time-to-Live (TTL)]({{ site.baseurl }}{% link dev/stream/state/state.md
+%}#state-time-to-live-ttl) option that you can configure with the state descriptor that specifies
+when you want the state for stale keys to be automatically cleared.
+
+### Non-keyed State
+
+It is also possible to work with managed state in non-keyed contexts. This is sometimes called
+[operator state]({{ site.baseurl }}{% link dev/stream/state/state.md %}#operator-state). The
+interfaces involved are somewhat different, and since it is unusual for user-defined functions to
+need non-keyed state, it is not covered here. This feature is most often used in the implementation
+of sources and sinks. 
+
+{% top %}
+
+## Connected Streams
+
+Sometimes instead of applying a pre-defined transformation like this:
+
+<img src="{{ site.baseurl }}/fig/transformation.svg" alt="simple transformation" class="offset" width="45%" />
+
+you want to be able to dynamically alter some aspects of the transformation -- by streaming in
+thresholds, or rules, or other parameters. The pattern in Flink that supports this is something
+called _connected streams_, wherein a single operator has two input streams, like this:
+
+<img src="{{ site.baseurl }}/fig/connected-streams.svg" alt="connected streams" class="offset" width="45%" />
+
+Connected streams can also be used to implement streaming joins.
+
+### Example
+
+In this example a control stream is used to specify words which must be filtered out of the
+`streamOfWords`. A `RichCoFlatMapFunction` called `ControlFunction` is applied to the connected
+streams to get this done. 
+
+{% highlight java %}
+public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
+    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
+  
+    control
+        .connect(datastreamOfWords)
+        .flatMap(new ControlFunction())
+        .print();
+
+    env.execute();
+}
+{% endhighlight %}
+
+Note that the two streams being connected must be keyed in compatible ways -- either both streams
+are not keyed, or both are keyed, and if they are both keyed, the key values have to be the same. In
+this case the streams are both of type `DataStream<String>`, and both streams are keyed by the
+string. As you will see below, this `RichCoFlatMapFunction` is storing a Boolean value in keyed
+state, and this Boolean is shared by the two streams.
+
+{% highlight java %}
+public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
+    private ValueState<Boolean> blocked;
+      
+    @Override
+    public void open(Configuration config) {
+        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
+    }
+      
+    @Override
+    public void flatMap1(String control_value, Collector<String> out) throws Exception {
+        blocked.update(Boolean.TRUE);
+    }
+      
+    @Override
+    public void flatMap2(String data_value, Collector<String> out) throws Exception {
+        if (blocked.value() == null) {
+            out.collect(data_value);
+        }
+    }
+}
+{% endhighlight %}
+
+A `RichCoFlatMapFunction` is a kind of `FlatMapFunction` that can be applied to a pair of connected
+streams, and it has access to the rich function interface. This means that it can be made stateful.
+
+The `blocked` Boolean is being used to remember the keys (words, in this case) that have been
+mentioned on the `control` stream, and those words are being filtered out of the `streamOfWords`
+stream. This is _keyed_ state, and it is shared between the two streams, which is why the two
+streams have to share the same keyspace.
+
+`flatMap1` and `flatMap2` are called by the Flink runtime with elements from each of the two
+connected streams -- in our case, elements from the `control` stream are passed into `flatMap1`, and
+elements from `streamOfWords` are passed into `flatMap2`. This was determined by the order in which
+the two streams are connected with `control.connect(datastreamOfWords)`. 
+
+It is important to recognize that you have no control over the order in which the `flatMap1` and
+`flatMap2` callbacks are called. These two input streams are racing against each other, and the
+Flink runtime will do what it wants to regarding consuming events from one stream or the other. In
+cases where timing and/or ordering matter, you may find it necessary to buffer events in managed
+Flink state until your application is ready to process them. (Note: if you are truly desperate, it
+is possible to exert some limited control over the order in which a two-input operator consumes its
+inputs by using a custom Operator that implements the
+[InputSelectable](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html)
+interface.)
+
+{% top %}
+
+## Hands-on
+
+The hands-on exercise that goes with this section is the [Rides and Fares
+Exercise](https://github.com/apache/flink-training/tree/master/rides-and-fares).

Review comment:
       the operations playground instructions at [1] try to be clever and use a URL that matches the documentation version but messes up with `master`...it would, however, be nice to have something like this (if not now, then please open a ticket for it)
   
   [1] https://ci.apache.org/projects/flink/flink-docs-master/getting-started/docker-playgrounds/flink-operations-playground.html#starting-the-playground




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] NicoK commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
NicoK commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412863981



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?

Review comment:
       yes, sure (not using code and keeping code in its original form) - I was mainly thinking about the other words in the heading though.
   
   let's ignore this for now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11842:
URL: https://github.com/apache/flink/pull/11842#issuecomment-617121796


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161228600",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7882",
       "triggerID" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161289068",
       "triggerID" : "eefa417ee0e3ca8248d77c28d42c7bd48bca1306",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49d20891408e825c247a91d668d525927d44c87 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161228600) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859) 
   * eefa417ee0e3ca8248d77c28d42c7bd48bca1306 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161289068) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7882) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] NicoK commented on a change in pull request #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
NicoK commented on a change in pull request #11842:
URL: https://github.com/apache/flink/pull/11842#discussion_r412863981



##########
File path: docs/tutorials/etl.md
##########
@@ -0,0 +1,518 @@
+---
+title: Data Pipelines & ETL
+nav-id: etl
+nav-pos: 3
+nav-title: Data Pipelines & ETL
+nav-parent_id: tutorials
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Stateless Transformations
+
+The examples in this section assume you are familiar with the Taxi Ride data used in the hands-on
+exercises in the [flink-training repo](https://github.com/apache/flink-training).
+
+### `map()`
+
+In the first exercise you filtered a stream of taxi ride events. In that same code base there's a
+`GeoUtils` class that provides a static method `GeoUtils.mapToGridCell(float lon, float lat)` which
+maps a location (longitude, latitude) to a grid cell that refers to an area that is approximately
+100x100 meters in size.
+
+Now let's enrich our stream of taxi ride objects by adding `startCell` and `endCell` fields to each
+event. You can create an `EnrichedRide` object that extends `TaxiRide`, adding these fields:
+
+{% highlight java %}
+public static class EnrichedRide extends TaxiRide {
+    public int startCell;
+    public int endCell;
+
+    public EnrichedRide() {}
+
+    public EnrichedRide(TaxiRide ride) {
+        this.rideId = ride.rideId;
+        this.isStart = ride.isStart;
+        ...
+        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
+        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
+    }
+
+    public String toString() {
+        return super.toString() + "," +
+            Integer.toString(this.startCell) + "," +
+            Integer.toString(this.endCell);
+    }
+}
+{% endhighlight %}
+
+You can then create an application that transforms the stream
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .filter(new RideCleansing.NYCFilter())
+    .map(new Enrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+with this `MapFunction`:
+
+{% highlight java %}
+public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
+        return new EnrichedRide(taxiRide);
+    }
+}
+{% endhighlight %}
+
+### `flatmap()`
+
+A `MapFunction` is suitable only when performing a one-to-one transformation: for each and every
+stream element coming in, `map()` will emit one transformed element. Otherwise, you'll want to use
+`flatmap()`
+
+{% highlight java %}
+DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
+
+DataStream<EnrichedRide> enrichedNYCRides = rides
+    .flatMap(new NYCEnrichment());
+
+enrichedNYCRides.print();
+{% endhighlight %}
+
+together with a `FlatMapFunction`:
+
+{% highlight java %}
+public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
+
+    @Override
+    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
+        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
+        if (valid.filter(taxiRide)) {
+            out.collect(new EnrichedRide(taxiRide));
+        }
+    }
+}
+{% endhighlight %}
+
+With the `Collector` provided in this interface, the `flatmap()` method can emit as many stream
+elements as you like, including none at all.
+
+{% top %}
+
+## Keyed Streams
+
+### `keyBy()`
+
+It is often very useful to be able to partition a stream around one of its attributes, so that all
+events with the same value of that attribute are grouped together. For example, suppose you wanted
+to find the longest taxi rides starting in each of the grid cells. Thinking in terms of a SQL query,
+this would mean doing some sort of GROUP BY with the `startCell`, while in Flink this is done with
+`keyBy(KeySelector)`
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy("startCell")
+{% endhighlight %}
+
+Every keyBy causes a network shuffle that repartitions the stream. In general this is pretty
+expensive, since it involves network communication along with serialization and deserialization.
+
+<img src="{{ site.baseurl }}/fig/keyBy.png" alt="keyBy and network shuffle" class="offset" width="45%" />
+
+In the example above, the key has been specified by a field name, "startCell". This style of key
+selection has the drawback that the compiler is unable to infer the type of the field being used for
+keying, and so Flink will pass around the key values as Tuples, which can be awkward. It is
+better to use a properly typed KeySelector, e.g.,
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(
+        new KeySelector<EnrichedRide, int>() {
+
+            @Override
+            public int getKey(EnrichedRide enrichedRide) throws Exception {
+                return enrichedRide.startCell;
+            }
+        })
+{% endhighlight %}
+
+which can be more succinctly expressed with a lambda:
+
+{% highlight java %}
+rides
+    .flatMap(new NYCEnrichment())
+    .keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+### Keys are computed
+
+KeySelectors aren't limited to extracting a key from your events. They can, instead, 
+compute the key in whatever way you want, so long as the resulting key is deterministic,
+and has valid implementations of `hashCode()` and `equals()`. This restriction rules out
+KeySelectors that generate random numbers, or that return Arrays or Enums, but you
+can have composite keys using Tuples or POJOs, for example, so long as their elements
+follow these same rules.
+
+The keys must be produced in a deterministic way, because they are recomputed whenever they
+are needed, rather than being attached to the stream records.
+
+For example, rather than creating a new EnrichedRide class with a startCell field that we then use
+as a key via 
+
+{% highlight java %}
+keyBy(enrichedRide -> enrichedRide.startCell)
+{% endhighlight %}
+
+we could do this, instead:
+
+{% highlight java %}
+keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
+{% endhighlight %}
+
+### Aggregations on Keyed Streams
+
+This bit of code creates a new stream of tuples containing the `startCell` and duration (in minutes)
+for each end-of-ride event:
+
+{% highlight java %}
+DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
+    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
+
+        @Override
+        public void flatMap(EnrichedRide ride,
+                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
+            if (!ride.isStart) {
+                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
+                Minutes duration = rideInterval.toDuration().toStandardMinutes();
+                out.collect(new Tuple2<>(ride.startCell, duration));
+            }
+        }
+    });
+{% endhighlight %}
+
+Now it is possible to produce a stream that contains only those rides that are the longest rides
+ever seen (to that point) for each `startCell`.
+
+There are a variety of ways that the field to use as the key can be expressed. Earlier you saw an
+example with an EnrichedRide POJO, where the field to use as the key was specified with its name.
+This case involves Tuple2 objects, and the index within the tuple (starting from 0) is used to
+specify the key.
+
+{% highlight java %}
+minutesByStartCell
+  .keyBy(0) // startCell
+  .maxBy(1) // duration
+  .print();
+{% endhighlight %}
+
+The output stream now contains a record for each key every time the duration reaches a new maximum -- as shown here with cell 50797:
+
+    ...
+    4> (64549,5M)
+    4> (46298,18M)
+    1> (51549,14M)
+    1> (53043,13M)
+    1> (56031,22M)
+    1> (50797,6M)
+    ...
+    1> (50797,8M)
+    ...
+    1> (50797,11M)
+    ...
+    1> (50797,12M)
+
+### (Implicit) State
+
+This is the first example in these tutorials that involves stateful streaming. Though the state is
+being handled transparently, Flink is having to keep track of the maximum duration for each distinct
+key.
+
+Whenever state gets involved in your application, you should think about how large the state might
+become. Whenever the key space is unbounded, then so is the amount of state Flink will need.
+
+When working with streams it generally makes more sense to think in terms of aggregations over
+finite windows, rather than over the entire stream.
+
+### `reduce()` and other aggregators
+
+`maxBy()`, used above, is just one example of a number of aggregator functions available on Flink's
+`KeyedStream`s. There is also a more general purpose `reduce()` function that you can use to
+implement your own custom aggregations.
+
+{% top %}
+
+## Stateful Transformations
+
+### Why is Flink Involved in Managing State?

Review comment:
       yes, sure - I was mainly thinking about the other words in the heading though.
   
   let's ignore this for now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11842: [FLINK-17238][docs] Data Pipelines and ETL tutorial

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11842:
URL: https://github.com/apache/flink/pull/11842#issuecomment-617121796


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161228600",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49d20891408e825c247a91d668d525927d44c87",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859",
       "triggerID" : "c49d20891408e825c247a91d668d525927d44c87",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49d20891408e825c247a91d668d525927d44c87 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161228600) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7859) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org