You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/08/07 14:50:18 UTC

[flink] branch master updated: [FLINK-12749] [docs] Add operations Docker playground.

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f695a76  [FLINK-12749] [docs] Add operations Docker playground.
f695a76 is described below

commit f695a76b10b0cb5f074bbb874fe374cd11e6eff3
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Tue Jun 25 14:26:37 2019 +0200

    [FLINK-12749] [docs] Add operations Docker playground.
    
    * Add new Docker Playgrounds section to Getting Started documenation section
    * Add new count click events example jobs
    
    This closes #9192.
---
 docs/fig/click-event-count-example.svg             |  21 +
 docs/fig/flink-docker-playground.svg               |  21 +
 docs/fig/playground-webui-failure.png              | Bin 0 -> 37334 bytes
 docs/fig/playground-webui.png                      | Bin 0 -> 18135 bytes
 .../docker-playgrounds/flink_cluster_playground.md | 812 +++++++++++++++++++++
 .../flink_cluster_playground.zh.md                 | 774 ++++++++++++++++++++
 docs/getting-started/docker-playgrounds/index.md   |  25 +
 .../getting-started/docker-playgrounds/index.zh.md |  25 +
 flink-dist/pom.xml                                 |   7 +
 flink-dist/src/main/assemblies/bin.xml             |  11 +
 .../pom.xml                                        | 106 +++
 .../src/main/resources/META-INF/NOTICE             |   4 +-
 .../src/main/resources/META-INF/NOTICE             |   2 +-
 flink-examples/flink-examples-build-helper/pom.xml |   1 +
 flink-examples/flink-examples-streaming/pom.xml    |   3 +-
 .../statemachine/KafkaEventsGeneratorJob.java      |   4 +-
 .../examples/statemachine/StateMachineExample.java |   4 +-
 .../windowing/clickeventcount/ClickEventCount.java | 117 +++
 .../clickeventcount/ClickEventGenerator.java       | 122 ++++
 .../functions/ClickEventStatisticsCollector.java   |  47 ++
 .../functions/CountingAggregator.java              |  47 ++
 .../clickeventcount/records/ClickEvent.java        |  85 +++
 .../records/ClickEventDeserializationSchema.java   |  51 ++
 .../records/ClickEventSerializationSchema.java     |  55 ++
 .../records/ClickEventStatistics.java              | 116 +++
 .../ClickEventStatisticsSerializationSchema.java   |  55 ++
 26 files changed, 2506 insertions(+), 9 deletions(-)

diff --git a/docs/fig/click-event-count-example.svg b/docs/fig/click-event-count-example.svg
new file mode 100644
index 0000000..4d9c06f
--- /dev/null
+++ b/docs/fig/click-event-count-example.svg
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="713px" height="359px" viewBox="-0.5 -0.5 713 359" content="&lt;mxfile modified=&quot;2019-07-30T06:33:46.579Z&quot; host=&quot;www.draw.io&quot; agent=&quot;Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0&quot; etag=&quot;Gyms1__7o2-6Tou9Fwcv&quot; version=&quot;11.0.7&quot; type=&quot;device&quot;&gt;&lt;diagram id=&quot;axHalsAsTUV6G1jOH0Rx&quot; name=&quot;Page- [...]
\ No newline at end of file
diff --git a/docs/fig/flink-docker-playground.svg b/docs/fig/flink-docker-playground.svg
new file mode 100644
index 0000000..24a53e2
--- /dev/null
+++ b/docs/fig/flink-docker-playground.svg
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="681px" height="221px" viewBox="-0.5 -0.5 681 221" content="&lt;mxfile modified=&quot;2019-07-30T05:46:19.236Z&quot; host=&quot;www.draw.io&quot; agent=&quot;Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0&quot; etag=&quot;6b7qPJhosj6WVEuTns2y&quot; version=&quot;11.0.7&quot; type=&quot;device&quot;&gt;&lt;diagram id=&quot;zIUxMKcIWk6lTGESeTwo&quot; name=&quot;Page- [...]
\ No newline at end of file
diff --git a/docs/fig/playground-webui-failure.png b/docs/fig/playground-webui-failure.png
new file mode 100644
index 0000000..31968dc
Binary files /dev/null and b/docs/fig/playground-webui-failure.png differ
diff --git a/docs/fig/playground-webui.png b/docs/fig/playground-webui.png
new file mode 100644
index 0000000..3833d6d
Binary files /dev/null and b/docs/fig/playground-webui.png differ
diff --git a/docs/getting-started/docker-playgrounds/flink_cluster_playground.md b/docs/getting-started/docker-playgrounds/flink_cluster_playground.md
new file mode 100644
index 0000000..7f6ef23
--- /dev/null
+++ b/docs/getting-started/docker-playgrounds/flink_cluster_playground.md
@@ -0,0 +1,812 @@
+---
+title: "Flink Cluster Playground"
+nav-title: 'Flink Cluster Playground'
+nav-parent_id: docker-playgrounds
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+There are many ways to deploy and operate Apache Flink in various environments. Regardless of this
+variety, the fundamental building blocks of a Flink Cluster remain the same, and similar
+operational principles apply.
+
+In this playground, you will learn how to manage and run Flink Jobs. You will see how to deploy and 
+monitor an application, experience how Flink recovers from Job failure, and perform everyday 
+operational tasks like upgrades and rescaling.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Anatomy of this Playground
+
+This playground consists of a long living
+[Flink Session Cluster]({{ site.baseurl }}/concepts/glossary.html#flink-session-cluster) and a Kafka
+Cluster.
+
+A Flink Cluster always consists of a 
+[Flink Master]({{ site.baseurl }}/concepts/glossary.html#flink-master) and one or more 
+[Flink TaskManagers]({{ site.baseurl }}/concepts/glossary.html#flink-taskmanager). The Flink Master 
+is responsible for handling [Job]({{ site.baseurl }}/concepts/glossary.html#flink-job) submissions, 
+the supervision of Jobs as well as resource management. The Flink TaskManagers are the worker 
+processes and are responsible for the execution of the actual 
+[Tasks]({{ site.baseurl }}/concepts/glossary.html#task) which make up a Flink Job. In this 
+playground you will start with a single TaskManager, but scale out to more TaskManagers later. 
+Additionally, this playground comes with a dedicated *client* container, which we use to submit the 
+Flink Job initially and to perform various operational tasks later on. The *client* container is not
+needed by the Flink Cluster itself but only included for ease of use.
+
+The Kafka Cluster consists of a Zookeeper server and a Kafka Broker.
+
+<img src="{{ site.baseurl }}/fig/flink-docker-playground.svg" alt="Flink Docker Playground"
+class="offset" width="80%" />
+
+When the playground is started a Flink Job called *Flink Event Count* will be submitted to the 
+Flink Master. Additionally, two Kafka Topics *input* and *output* are created.
+
+<img src="{{ site.baseurl }}/fig/click-event-count-example.svg" alt="Click Event Count Example"
+class="offset" width="80%" />
+
+The Job consumes `ClickEvent`s from the *input* topic, each with a `timestamp` and a `page`. The 
+events are then keyed by `page` and counted in 15 second
+[windows]({{ site.baseurl }}/dev/stream/operators/windows.html). The results are written to the 
+*output* topic. 
+
+There are six different pages and we generate 1000 click events per page and 15 seconds. Hence, the 
+output of the Flink job should show 1000 views per page and window.
+
+{% top %}
+
+## Starting the Playground
+
+{% if site.version contains "SNAPSHOT" %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+  <b>Note</b>: The Apache Flink Docker images used for this playground are only available for
+  released versions of Apache Flink. Since you are currently looking at the latest SNAPSHOT
+  version of the documentation the branch referenced below will not exist. You can either change it 
+  manually or switch to the released version of the documentation via the release picker.
+</p>
+{% endif %}
+
+The playground environment is set up in just a few steps. We will walk you through the necessary 
+commands and show how to validate that everything is running correctly.
+
+We assume that you have that you have [docker](https://docs.docker.com/) (1.12+) and
+[docker-compose](https://docs.docker.com/compose/) (2.1+) installed on your machine.
+
+The required configuration files are available in the 
+[flink-playgrounds](https://github.com/apache/flink-playgrounds) repository. Check it out and spin
+up the environment:
+
+{% highlight bash %}
+git clone --branch release-{{ site.version }} git@github.com:apache/flink-playgrounds.git
+cd flink-cluster-playground
+docker-compose up -d
+{% endhighlight %}
+
+Afterwards, you can inspect the running Docker containers with the following command:
+
+{% highlight bash %}
+docker-compose ps
+
+                     Name                                    Command               State                   Ports                
+--------------------------------------------------------------------------------------------------------------------------------
+flink-cluster-playground_clickevent-generator_1   /docker-entrypoint.sh java ...   Up       6123/tcp, 8081/tcp                  
+flink-cluster-playground_client_1                 /docker-entrypoint.sh flin ...   Exit 0                                       
+flink-cluster-playground_jobmanager_1             /docker-entrypoint.sh jobm ...   Up       6123/tcp, 0.0.0.0:8081->8081/tcp    
+flink-cluster-playground_kafka_1                  start-kafka.sh                   Up       0.0.0.0:9094->9094/tcp              
+flink-cluster-playground_taskmanager_1            /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp                  
+flink-cluster-playground_zookeeper_1              /bin/sh -c /usr/sbin/sshd  ...   Up       2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
+{% endhighlight %}
+
+This indicates that the client container has successfully submitted the Flink Job (`Exit 0`) and all 
+cluster components as well as the data generator are running (`Up`).
+
+You can stop the playground environment by calling:
+
+{% highlight bash %}
+docker-compose down -v
+{% endhighlight %}
+
+## Entering the Playground
+
+There are many things you can try and check out in this playground. In the following two sections we 
+will show you how to interact with the Flink Cluster and demonstrate some of Flink's key features.
+
+### Flink WebUI
+
+The most natural starting point to observe your Flink Cluster is the Web UI exposed under 
+[http://localhost:8081](http://localhost:8081). If everything went well, you'll see that the cluster initially consists of 
+one TaskManager and executes a Job called *Click Event Count*.
+
+<img src="{{ site.baseurl }}/fig/playground-webui.png" alt="Playground Flink WebUI"
+class="offset" width="100%" />
+
+The Flink WebUI contains a lot of useful and interesting information about your Flink Cluster and 
+its Jobs (JobGraph, Metrics, Checkpointing Statistics, TaskManager Status,...). 
+
+### Logs
+
+**JobManager**
+
+The JobManager logs can be tailed via `docker-compose`.
+
+{% highlight bash %}
+docker-compose logs -f jobmanager
+{% endhighlight %}
+
+After the initial startup you should mainly see log messages for every checkpoint completion.
+
+**TaskManager**
+
+The TaskManager log can be tailed in the same way.
+{% highlight bash %}
+docker-compose logs -f taskmanager
+{% endhighlight %}
+
+After the initial startup you should mainly see log messages for every checkpoint completion.
+
+### Flink CLI
+
+The [Flink CLI]({{ site.baseurl }}/ops/cli.html) can be used from within the client container. For
+example, to print the `help` message of the Flink CLI you can run
+{% highlight bash%}
+docker-compose run --no-deps client flink --help
+{% endhighlight %}
+
+### Flink REST API
+
+The [Flink REST API]({{ site.baseurl }}/monitoring/rest_api.html#api) is exposed via
+`localhost:8081` on the host or via `jobmanager:8081` from the client container, e.g. to list all
+currently running jobs, you can run:
+{% highlight bash%}
+curl localhost:8081/jobs
+{% endhighlight %}
+
+{% if site.version contains "SNAPSHOT" %}
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+  <b>Note</b>: If the <i>curl</i> command is not available on your machine, you can run it from the client
+  container (similar to the Flink CLI):
+{% highlight bash%}
+docker-compose run --no-deps client curl jobmanager:8081/jobs 
+{% endhighlight %}  
+</p>
+{% endif %}
+
+### Kafka Topics
+
+You can look at the records that are written to the Kafka Topics by running
+{% highlight bash%}
+//input topic (1000 records/s)
+docker-compose exec kafka kafka-console-consumer.sh \
+  --bootstrap-server localhost:9092 --topic input
+
+//output topic (24 records/min)
+docker-compose exec kafka kafka-console-consumer.sh \
+  --bootstrap-server localhost:9092 --topic output
+{% endhighlight %}
+
+{%  top %}
+
+## Time to Play!
+
+Now that you learned how to interact with Flink and the Docker containers, let's have a look at 
+some common operational tasks that you can try out on our playground.
+All of these tasks are independent of each other, i.e.i you can perform them in any order. 
+Most tasks can be executed via the [CLI](#flink-cli) and the [REST API](#flink-rest-api).
+
+### Listing Running Jobs
+
+<div class="codetabs" markdown="1">
+<div data-lang="CLI" markdown="1">
+**Command**
+{% highlight bash %}
+docker-compose run --no-deps client flink list
+{% endhighlight %}
+**Expected Output**
+{% highlight plain %}
+Waiting for response...
+------------------ Running/Restarting Jobs -------------------
+16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING)
+--------------------------------------------------------------
+No scheduled jobs.
+{% endhighlight %}
+</div>
+<div data-lang="REST API" markdown="1">
+**Request**
+{% highlight bash %}
+curl localhost:8081/jobs
+{% endhighlight %}
+**Expected Response (pretty-printed)**
+{% highlight bash %}
+{
+  "jobs": [
+    {
+      "id": "<job-id>",
+      "status": "RUNNING"
+    }
+  ]
+}
+{% endhighlight %}
+</div>
+</div>
+
+The JobID is assigned to a Job upon submission and is needed to perform actions on the Job via the 
+CLI or REST API.
+
+### Observing Failure & Recovery
+
+Flink provides exactly-once processing guarantees under (partial) failure. In this playground you 
+can observe and - to some extent - verify this behavior. 
+
+#### Step 1: Observing the Output
+
+As described [above](#anatomy-of-this-playground), the events in this playground are generate such 
+that each window  contains exactly one thousand records. So, in order to verify that Flink 
+successfully recovers from a TaskManager failure without data loss or duplication you can tail the 
+output topic and check that - after recovery - all windows are present and the count is correct.
+
+For this, start reading from the *output* topic and leave this command running until after 
+recovery (Step 3).
+
+{% highlight bash%}
+docker-compose exec kafka kafka-console-consumer.sh \
+  --bootstrap-server localhost:9092 --topic output
+{% endhighlight %}
+
+#### Step 2: Introducing a Fault
+
+In order to simulate a partial failure you can kill a TaskManager. In a production setup, this 
+could correspond to a loss of the TaskManager process, the TaskManager machine or simply a transient 
+exception being thrown from the framework or user code (e.g. due to the temporary unavailability of 
+an external resource).   
+
+{% highlight bash%}
+docker-compose kill taskmanager
+{% endhighlight %}
+
+After a few seconds, Flink will notice the loss of the TaskManager, cancel the affected Job, and 
+immediately resubmit it for recovery.
+When the Job gets restarted, its tasks remain in the `SCHEDULED` state, which is indicated by the 
+purple colored squares (see screenshot below).
+
+<img src="{{ site.baseurl }}/fig/playground-webui-failure.png" alt="Playground Flink WebUI" 
+class="offset" width="100%" />
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+  <b>Note</b>: Even though the tasks of the job are in SCHEDULED state and not RUNNING yet, the overall 
+  status of a Job is shown as RUNNING.
+</p>
+
+At this point, the tasks of the Job cannot move from the `SCHEDULED` state to `RUNNING` because there
+are no resources (TaskSlots provided by TaskManagers) to the run the tasks.
+Until a new TaskManager becomes available, the Job will go through a cycle of cancellations and resubmissions.
+
+In the meantime, the data generator keeps pushing `ClickEvent`s into the *input* topic. This is 
+similar to a real production setup where data is produced while the Job to process it is down.
+
+#### Step 3: Recovery
+
+Once you restart the TaskManager, it reconnects to the Master.
+
+{% highlight bash%}
+docker-compose up -d taskmanager
+{% endhighlight %}
+
+When the Master is notified about the new TaskManager, it schedules the tasks of the 
+recovering Job to the newly available TaskSlots. Upon restart, the tasks recover their state from
+the last successful [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html) that was taken
+before the failure and switch to the `RUNNING` state.
+
+The Job will quickly process the full backlog of input events (accumulated during the outage) 
+from Kafka and produce output at a much higher rate (> 24 records/minute) until it reaches 
+the head of the stream. In the *output* you will see that all keys (`page`s) are present for all time 
+windows and that every count is exactly one thousand. Since we are using the 
+[FlinkKafkaProducer]({{ site.baseurl }}/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance)
+in its "at-least-once" mode, there is a chance that you will see some duplicate output records.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+  <b>Note</b>: Most production setups rely on a resource manager (Kubernetes, Yarn, Mesos) to
+  automatically restart failed processes.
+</p>
+
+### Upgrading & Rescaling a Job
+
+Upgrading a Flink Job always involves two steps: First, the Flink Job is gracefully stopped with a
+[Savepoint]({{site.base_url}}/ops/state/savepoints.html). A Savepoint is a consistent snapshot of 
+the complete application state at a well-defined, globally consistent point in time (similar to a 
+checkpoint). Second, the upgraded Flink Job is started from the Savepoint. In this context "upgrade" 
+can mean different things including the following:
+
+* An upgrade to the configuration (incl. the parallelism of the Job)
+* An upgrade to the topology of the Job (added/removed Operators)
+* An upgrade to the user-defined functions of the Job
+
+Before starting with the upgrade you might want to start tailing the *output* topic, in order to 
+observe that no data is lost or corrupted in the course the upgrade. 
+
+{% highlight bash%}
+docker-compose exec kafka kafka-console-consumer.sh \
+  --bootstrap-server localhost:9092 --topic output
+{% endhighlight %}
+
+#### Step 1: Stopping the Job
+
+To gracefully stop the Job, you need to use the "stop" command of either the CLI or the REST API. 
+For this you will need the JobID of the Job, which you can obtain by 
+[listing all running Jobs](#listing-running-jobs) or from the WebUI. With the JobID you can proceed 
+to stopping the Job:
+
+<div class="codetabs" markdown="1">
+<div data-lang="CLI" markdown="1">
+**Command**
+{% highlight bash %}
+docker-compose run --no-deps client flink stop <job-id>
+{% endhighlight %}
+**Expected Output**
+{% highlight bash %}
+Suspending job "<job-id>" with a savepoint.
+Suspended job "<job-id>" with a savepoint.
+{% endhighlight %}
+
+The Savepoint has been stored to the `state.savepoint.dir` configured in the *flink-conf.yaml*, 
+which is mounted under */tmp/flink-savepoints-directory/* on your local machine. You will need the 
+path to this Savepoint in the next step. In case of the REST API this path was already part of the 
+response, you will need to have a look at the filesystem directly.
+
+**Command**
+{% highlight bash %}
+ls -lia /tmp/flink-savepoints-directory
+{% endhighlight %}
+
+**Expected Output**
+{% highlight bash %}
+total 0
+  17 drwxr-xr-x   3 root root   60 17 jul 17:05 .
+   2 drwxrwxrwt 135 root root 3420 17 jul 17:09 ..
+1002 drwxr-xr-x   2 root root  140 17 jul 17:05 savepoint-<short-job-id>-<uuid>
+{% endhighlight %}
+</div>
+ <div data-lang="REST API" markdown="1">
+ 
+ **Request**
+{% highlight bash %}
+# triggering stop
+curl -X POST localhost:8081/jobs/<job-id>/stop -d '{"drain": false}'
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "request-id": "<trigger-id>"
+}
+{% endhighlight %}
+
+**Request**
+{% highlight bash %}
+# check status of stop action and retrieve savepoint path
+ curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id>
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "status": {
+    "id": "COMPLETED"
+  },
+  "operation": {
+    "location": "<savepoint-path>"
+  }
+
+{% endhighlight %}
+</div>
+</div>
+
+#### Step 2a: Restart Job without Changes
+
+You can now restart the upgraded Job from this Savepoint. For simplicity, you can start by 
+restarting it without any changes.
+
+<div class="codetabs" markdown="1">
+<div data-lang="CLI" markdown="1">
+**Command**
+{% highlight bash %}
+docker-compose run --no-deps client flink run -s <savepoint-path> \
+  -d /opt/flink/examples/streaming/ClickEventCount.jar \
+  --bootstrap.servers kafka:9092 --checkpointing --event-time
+{% endhighlight %}
+**Expected Output**
+{% highlight bash %}
+Starting execution of program
+Job has been submitted with JobID <job-id>
+{% endhighlight %}
+</div>
+<div data-lang="REST API" markdown="1">
+
+**Request**
+{% highlight bash %}
+# Uploading the JAR from the Client container
+docker-compose run --no-deps client curl -X POST -H "Expect:" \
+  -F "jarfile=@/opt/flink/examples/streaming/ClickEventCount.jar" http://jobmanager:8081/jars/upload
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
+  "status": "success"
+}
+
+{% endhighlight %}
+
+**Request**
+{% highlight bash %}
+# Submitting the Job
+curl -X POST http://localhost:8081/jars/<jar-id>/run \
+  -d '{"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'
+{% endhighlight %}
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "jobid": "<job-id>"
+}
+{% endhighlight %}
+</div>
+</div>
+
+Once the Job is `RUNNING` again, you will see in the *output* Topic that records are produced at a 
+higher rate while the Job is processing the backlog accumulated during the outage. Additionally, 
+you will see that no data was lost during the upgrade: all windows are present with a count of 
+exactly one thousand. 
+
+#### Step 2b: Restart Job with a Different Parallelism (Rescaling)
+
+Alternatively, you could also rescale the Job from this Savepoint by passing a different parallelism
+during resubmission.
+
+<div class="codetabs" markdown="1">
+<div data-lang="CLI" markdown="1">
+**Command**
+{% highlight bash %}
+docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \
+  -d /opt/flink/examples/streaming/ClickEventCount.jar \
+  --bootstrap.servers kafka:9092 --checkpointing --event-time
+{% endhighlight %}
+**Expected Output**
+{% highlight bash %}
+Starting execution of program
+Job has been submitted with JobID <job-id>
+{% endhighlight %}
+</div>
+<div data-lang="REST API" markdown="1">
+
+**Request**
+{% highlight bash %}
+# Uploading the JAR from the Client container
+docker-compose run --no-deps client curl -X POST -H "Expect:" \
+  -F "jarfile=@/opt/flink/examples/streaming/ClickEventCount.jar" http://jobmanager:8081/jars/upload
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
+  "status": "success"
+}
+
+{% endhighlight %}
+
+**Request**
+{% highlight bash %}
+# Submitting the Job
+curl -X POST http://localhost:8081/jars/<jar-id>/run \
+  -d '{"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'
+{% endhighlight %}
+**Expected Response (pretty-printed**
+{% highlight json %}
+{
+  "jobid": "<job-id>"
+}
+{% endhighlight %}
+</div>
+</div>
+Now, the Job has been resubmitted, but it will not start as there are not enough TaskSlots to
+execute it with the increased parallelism (2 available, 3 needed). With
+{% highlight bash %}
+docker-compose scale taskmanager=2
+{% endhighlight %}
+you can add a second TaskManager with two TaskSlots to the Flink Cluster, which will automatically register with the 
+Flink Master. Shortly after adding the TaskManager the Job should start running again.
+
+Once the Job is "RUNNING" again, you will see in the *output* Topic that now data was lost during 
+rescaling: all windows are present with a count of exactly one thousand.
+
+### Querying the Metrics of a Job
+
+The Flink Master exposes system and user [metrics]({{ site.baseurl }}/monitoring/metrics.html)
+via its REST API.
+
+The endpoint depends on the scope of these metrics. Metrics scoped to a Job can be listed via 
+`jobs/<job-id>/metrics`. The actual value of a metric can be queried via the `get` query parameter.
+
+**Request**
+{% highlight bash %}
+curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"
+{% endhighlight %}
+**Expected Response (pretty-printed; no placeholders)**
+{% highlight json %}
+[
+  {
+    "id": "lastCheckpointSize",
+    "value": "9378"
+  }
+]
+{% endhighlight %}
+
+The REST API can not only be used to query metrics, but you can also retrieve detailed information
+about the status of a running Job. 
+
+**Request**
+{% highlight bash %}
+# find the vertex-id of the vertex of interest
+curl localhost:8081/jobs/<jod-id>
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "jid": "<job-id>",
+  "name": "Click Event Count",
+  "isStoppable": false,
+  "state": "RUNNING",
+  "start-time": 1564467066026,
+  "end-time": -1,
+  "duration": 374793,
+  "now": 1564467440819,
+  "timestamps": {
+    "CREATED": 1564467066026,
+    "FINISHED": 0,
+    "SUSPENDED": 0,
+    "FAILING": 0,
+    "CANCELLING": 0,
+    "CANCELED": 0,
+    "RECONCILING": 0,
+    "RUNNING": 1564467066126,
+    "FAILED": 0,
+    "RESTARTING": 0
+  },
+  "vertices": [
+    {
+      "id": "<vertex-id>",
+      "name": "ClickEvent Source",
+      "parallelism": 2,
+      "status": "RUNNING",
+      "start-time": 1564467066423,
+      "end-time": -1,
+      "duration": 374396,
+      "tasks": {
+        "CREATED": 0,
+        "FINISHED": 0,
+        "DEPLOYING": 0,
+        "RUNNING": 2,
+        "CANCELING": 0,
+        "FAILED": 0,
+        "CANCELED": 0,
+        "RECONCILING": 0,
+        "SCHEDULED": 0
+      },
+      "metrics": {
+        "read-bytes": 0,
+        "read-bytes-complete": true,
+        "write-bytes": 5033461,
+        "write-bytes-complete": true,
+        "read-records": 0,
+        "read-records-complete": true,
+        "write-records": 166351,
+        "write-records-complete": true
+      }
+    },
+    {
+      "id": "<vertex-id>",
+      "name": "Timestamps/Watermarks",
+      "parallelism": 2,
+      "status": "RUNNING",
+      "start-time": 1564467066441,
+      "end-time": -1,
+      "duration": 374378,
+      "tasks": {
+        "CREATED": 0,
+        "FINISHED": 0,
+        "DEPLOYING": 0,
+        "RUNNING": 2,
+        "CANCELING": 0,
+        "FAILED": 0,
+        "CANCELED": 0,
+        "RECONCILING": 0,
+        "SCHEDULED": 0
+      },
+      "metrics": {
+        "read-bytes": 5066280,
+        "read-bytes-complete": true,
+        "write-bytes": 5033496,
+        "write-bytes-complete": true,
+        "read-records": 166349,
+        "read-records-complete": true,
+        "write-records": 166349,
+        "write-records-complete": true
+      }
+    },
+    {
+      "id": "<vertex-id>",
+      "name": "ClickEvent Counter",
+      "parallelism": 2,
+      "status": "RUNNING",
+      "start-time": 1564467066469,
+      "end-time": -1,
+      "duration": 374350,
+      "tasks": {
+        "CREATED": 0,
+        "FINISHED": 0,
+        "DEPLOYING": 0,
+        "RUNNING": 2,
+        "CANCELING": 0,
+        "FAILED": 0,
+        "CANCELED": 0,
+        "RECONCILING": 0,
+        "SCHEDULED": 0
+      },
+      "metrics": {
+        "read-bytes": 5085332,
+        "read-bytes-complete": true,
+        "write-bytes": 316,
+        "write-bytes-complete": true,
+        "read-records": 166305,
+        "read-records-complete": true,
+        "write-records": 6,
+        "write-records-complete": true
+      }
+    },
+    {
+      "id": "<vertex-id>",
+      "name": "ClickEventStatistics Sink",
+      "parallelism": 2,
+      "status": "RUNNING",
+      "start-time": 1564467066476,
+      "end-time": -1,
+      "duration": 374343,
+      "tasks": {
+        "CREATED": 0,
+        "FINISHED": 0,
+        "DEPLOYING": 0,
+        "RUNNING": 2,
+        "CANCELING": 0,
+        "FAILED": 0,
+        "CANCELED": 0,
+        "RECONCILING": 0,
+        "SCHEDULED": 0
+      },
+      "metrics": {
+        "read-bytes": 20668,
+        "read-bytes-complete": true,
+        "write-bytes": 0,
+        "write-bytes-complete": true,
+        "read-records": 6,
+        "read-records-complete": true,
+        "write-records": 0,
+        "write-records-complete": true
+      }
+    }
+  ],
+  "status-counts": {
+    "CREATED": 0,
+    "FINISHED": 0,
+    "DEPLOYING": 0,
+    "RUNNING": 4,
+    "CANCELING": 0,
+    "FAILED": 0,
+    "CANCELED": 0,
+    "RECONCILING": 0,
+    "SCHEDULED": 0
+  },
+  "plan": {
+    "jid": "<job-id>",
+    "name": "Click Event Count",
+    "nodes": [
+      {
+        "id": "<vertex-id>",
+        "parallelism": 2,
+        "operator": "",
+        "operator_strategy": "",
+        "description": "ClickEventStatistics Sink",
+        "inputs": [
+          {
+            "num": 0,
+            "id": "<vertex-id>",
+            "ship_strategy": "FORWARD",
+            "exchange": "pipelined_bounded"
+          }
+        ],
+        "optimizer_properties": {}
+      },
+      {
+        "id": "<vertex-id>",
+        "parallelism": 2,
+        "operator": "",
+        "operator_strategy": "",
+        "description": "ClickEvent Counter",
+        "inputs": [
+          {
+            "num": 0,
+            "id": "<vertex-id>",
+            "ship_strategy": "HASH",
+            "exchange": "pipelined_bounded"
+          }
+        ],
+        "optimizer_properties": {}
+      },
+      {
+        "id": "<vertex-id>",
+        "parallelism": 2,
+        "operator": "",
+        "operator_strategy": "",
+        "description": "Timestamps/Watermarks",
+        "inputs": [
+          {
+            "num": 0,
+            "id": "<vertex-id>",
+            "ship_strategy": "FORWARD",
+            "exchange": "pipelined_bounded"
+          }
+        ],
+        "optimizer_properties": {}
+      },
+      {
+        "id": "<vertex-id>",
+        "parallelism": 2,
+        "operator": "",
+        "operator_strategy": "",
+        "description": "ClickEvent Source",
+        "optimizer_properties": {}
+      }
+    ]
+  }
+}
+{% endhighlight %}
+
+Please consult the [REST API reference](https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#api)
+for a complete list of possible queries including how to query metrics of different scopes (e.g. 
+TaskManager metrics);
+
+{%  top %}
+
+## Variants
+
+You might have noticed that the *Click Event Count* was always started with `--checkpointing` and 
+`--event-time` program arguments. By omitting these in the command of the *client* container in the 
+`docker-compose.yaml`, you can change the behavior of the Job.
+
+* `--checkpointing` enables [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html), 
+which is Flink's fault-tolerance mechanism. If you run without it and go through 
+[failure and recovery](#observing-failure--recovery), you should will see that data is actually 
+lost.
+
+* `--event-time` enables [event time semantics]({{ site.baseurl }}/dev/event_time.html) for your 
+Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of 
+the timestamp of the `ClickEvent`. Consequently, the number of events per window will not be exactly
+one thousand anymore. 
diff --git a/docs/getting-started/docker-playgrounds/flink_cluster_playground.zh.md b/docs/getting-started/docker-playgrounds/flink_cluster_playground.zh.md
new file mode 100644
index 0000000..b6b299b
--- /dev/null
+++ b/docs/getting-started/docker-playgrounds/flink_cluster_playground.zh.md
@@ -0,0 +1,774 @@
+---
+title: "Flink Cluster Playground"
+nav-title: 'Flink Cluster Playground'
+nav-parent_id: docker-playgrounds
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+There are many ways to deploy and operate Apache Flink in various environments. Regardless of this
+variety, the fundamental building blocks of a Flink Cluster remain the same, and similar
+operational principles apply.
+
+In this playground, you will learn how to manage and run Flink Jobs. You will see how to deploy and 
+monitor an application, experience how Flink recovers from Job failure, and perform everyday 
+operational tasks like upgrades and rescaling.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Anatomy of this Playground
+
+This playground consists of a long living
+[Flink Session Cluster]({{ site.baseurl }}/concepts/glossary.html#flink-session-cluster) and a Kafka
+Cluster.
+
+A Flink Cluster always consists of a 
+[Flink Master]({{ site.baseurl }}/concepts/glossary.html#flink-master) and one or more 
+[Flink TaskManagers]({{ site.baseurl }}/concepts/glossary.html#flink-taskmanager). The Flink Master 
+is responsible for handling [Job]({{ site.baseurl }}/concepts/glossary.html#flink-job) submissions, 
+the supervision of Jobs as well as resource management. The Flink TaskManagers are the worker 
+processes and are responsible for the execution of the actual 
+[Tasks]({{ site.baseurl }}/concepts/glossary.html#task) which make up a Flink Job. In this 
+playground you will start with a single TaskManager, but scale out to more TaskManagers later. 
+Additionally, this playground comes with a dedicated *client* container, which we use to submit the 
+Flink Job initially and to perform various operational tasks later on. The *client* container is not
+needed by the Flink Cluster itself but only included for ease of use.
+
+The Kafka Cluster consists of a Zookeeper server and a Kafka Broker.
+
+<img src="{{ site.baseurl }}/fig/flink-docker-playground.svg" alt="Flink Docker Playground"
+class="offset" width="80%" />
+
+When the playground is started a Flink Job called *Flink Event Count* will be submitted to the 
+Flink Master. Additionally, two Kafka Topics *input* and *output* are created.
+
+<img src="{{ site.baseurl }}/fig/click-event-count-example.svg" alt="Click Event Count Example"
+class="offset" width="80%" />
+
+The Job consumes `ClickEvent`s from the *input* topic, each with a `timestamp` and a `page`. The 
+events are then keyed by `page` and counted in 15 second
+[windows]({{ site.baseurl }}/dev/stream/operators/windows.html). The results are written to the 
+*output* topic. 
+
+There are six different pages and we generate 1000 click events per page and 15 seconds. Hence, the 
+output of the Flink job should show 1000 views per page and window.
+
+{% top %}
+
+## Starting the Playground
+
+{% if site.version contains "SNAPSHOT" %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+  <b>Note</b>: The Apache Flink Docker images used for this playground are only available for
+  released versions of Apache Flink. Since you are currently looking at the latest SNAPSHOT
+  version of the documentation the branch referenced below will not exist. You can either change it 
+  manually or switch to the released version of the documentation via the release picker.
+</p>
+{% endif %}
+
+The playground environment is set up in just a few steps. We will walk you through the necessary 
+commands and show how to validate that everything is running correctly.
+
+We assume that you have that you have [docker](https://docs.docker.com/) (1.12+) and
+[docker-compose](https://docs.docker.com/compose/) (2.1+) installed on your machine.
+
+The required configuration files are available in the 
+[flink-playgrounds](https://github.com/apache/flink-playgrounds) repository. Check it out and spin
+up the environment:
+
+{% highlight bash %}
+git clone --branch release-{{ site.version }} git@github.com:apache/flink-playgrounds.git
+cd flink-cluster-playground
+docker-compose up -d
+{% endhighlight %}
+
+Afterwards, `docker-compose ps` should give you the following output:
+
+{% highlight bash %}
+                     Name                                    Command               State                   Ports                
+--------------------------------------------------------------------------------------------------------------------------------
+flink-cluster-playground_clickevent-generator_1   /docker-entrypoint.sh java ...   Up       6123/tcp, 8081/tcp                  
+flink-cluster-playground_client_1                 /docker-entrypoint.sh flin ...   Exit 0                                       
+flink-cluster-playground_jobmanager_1             /docker-entrypoint.sh jobm ...   Up       6123/tcp, 0.0.0.0:8081->8081/tcp    
+flink-cluster-playground_kafka_1                  start-kafka.sh                   Up       0.0.0.0:9094->9094/tcp              
+flink-cluster-playground_taskmanager_1            /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp                  
+flink-cluster-playground_zookeeper_1              /bin/sh -c /usr/sbin/sshd  ...   Up       2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
+{% endhighlight %}
+
+This indicates that the client container has successfully submitted the Flink Job ("Exit 0") and all 
+cluster components as well as the data generator are running ("Up").
+
+You can stop the playground environment by calling `docker-compose down -v`.
+
+## Entering the Playground
+
+There are many things you can try and check out in this playground. In the following two sections we 
+will show you how to interact with the Flink Cluster and demonstrate some of Flink's key features.
+
+### Flink WebUI
+
+The most natural starting point to observe your Flink Cluster is the Web UI exposed under 
+http://localhost:8081. If everything went well, you'll see that the cluster initially consists of 
+one TaskManager and one Job called *Click Event Count* is in "RUNNING" state.
+
+<img src="{{ site.baseurl }}/fig/playground-webui.png" alt="Playground Flink WebUI"
+class="offset" width="100%" />
+
+The Flink WebUI contains a lot of useful and interesting information about your Flink Cluster and 
+its Jobs (JobGraph, Metrics, Checkpointing Statistics, TaskManager Status,...). 
+
+### Logs
+
+**JobManager**
+
+The JobManager logs can be tailed via `docker-compose`.
+
+{% highlight bash %}
+docker-compose logs -f jobmanager
+{% endhighlight %}
+
+After the initial startup you should mainly see log messages for every checkpoint completion.
+
+**TaskManager**
+
+The TaskManager log can be tailed in the same way.
+{% highlight bash %}
+docker-compose logs -f taskmanager
+{% endhighlight %}
+
+After the initial startup you should mainly see log messages for every checkpoint completion.
+
+### Flink CLI
+
+The [Flink CLI]({{ site.baseurl }}/ops/cli.html) can be used from within the client container. For
+example, to print the `help` message of the Flink CLI you can run
+{% highlight bash%}
+docker-compose run --no-deps client flink --help
+{% endhighlight %}
+
+### Flink REST API
+
+The [Flink REST API]({{ site.baseurl }}/monitoring/rest_api.html#api) is exposed via
+`localhost:8081` on the host or via `jobmanager:8081` from the client container, e.g. to list all
+currently running jobs, you can run:
+{% highlight bash%}
+curl localhost:8081/jobs
+{% endhighlight %}
+
+{% if site.version contains "SNAPSHOT" %}
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+  <b>Note</b>: If `curl` is not available on your machine, you can run it from the *client* 
+  container (similar to the Flink CLI):
+  {% highlight bash%}
+  docker-compose run --no-deps client curl jobmanager:8081/jobs 
+  {% endhighlight %}  
+</p>
+{% endif %}
+
+### Kafka Topics
+
+To manually look at the records in the Kakfa Topics, you can run
+{% highlight bash%}
+//input topic (1000 records/s)
+docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input
+//output topic (24 records/min)
+docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
+{% endhighlight %}
+
+{%  top %}
+
+## Time to Play!
+
+This section describes some prototypical operational activities in the context of this playground. 
+They do not need to be executed in any particular order. Most of these tasks can be performed either
+via the [CLI](#flink-cli) or the [REST API](#flink-rest-api).
+
+### Listing Running Jobs
+
+<div class="codetabs" markdown="1">
+<div data-lang="CLI" markdown="1">
+**Command**
+{% highlight bash %}
+docker-compose run --no-deps client flink list
+{% endhighlight %}
+**Expected Output**
+{% highlight plain %}
+Waiting for response...
+------------------ Running/Restarting Jobs -------------------
+16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING)
+--------------------------------------------------------------
+No scheduled jobs.
+{% endhighlight %}
+</div>
+<div data-lang="REST API" markdown="1">
+**Request**
+{% highlight bash %}
+curl localhost:8081/jobs
+{% endhighlight %}
+**Expected Response (pretty-printed)**
+{% highlight bash %}
+{
+  "jobs": [
+    {
+      "id": "<job-id>",
+      "status": "RUNNING"
+    }
+  ]
+}
+{% endhighlight %}
+</div>
+</div>
+
+The JobID is assinged to a Job upon submission and is needed to perform actions on the Job via the 
+CLI or REST API.
+
+### Observing Failure & Recovery
+
+Flink provides exactly-once processing guarantees under (partial) failure. In this playground you 
+can observe and - to some extent - verify this behavior. 
+
+#### Step 1: Observing the Output
+
+As described [above](#anatomy-of-this-playground), the events in this playground are generate such 
+that each window  contains exactly one thousand records. So, in order to verify that Flink 
+successfully recovers from a TaskManager failure without data loss or duplication you can tail the 
+output topic and check that - after recovery - all windows are present and the count is correct.
+
+For this, start reading from the *output* topic and leave this command running until after 
+recovery (Step 3).
+
+{% highlight bash%}
+docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
+{% endhighlight %}
+
+#### Step 2: Introducing a Fault
+
+In order to simulate a partial failure you can kill a TaskManager. In a production setup, this 
+could correspond to a loss of the TaskManager process, the TaskManager machine or simply a transient 
+exception being thrown from the framework or user code (e.g. due to the temporary unavailability of 
+an external resource).   
+
+{% highlight bash%}
+docker-compose kill taskmanager
+{% endhighlight %}
+
+After a few seconds, you will see in the Flink WebUI that the Job failed, and has been 
+automatically resubmitted. At this point, it can not be restarted though due to the lack of 
+resources (no TaskSlots provided by TaskManagers) and will go through a cycle of cancellations and 
+resubmissions until resources become available again.
+
+<img src="{{ site.baseurl }}/fig/playground-webui-failure.png" alt="Playground Flink WebUI" 
+class="offset" width="100%" />
+
+In the meantime, the data generator keeps pushing `ClickEvent`s into the *input* topic. 
+
+#### Step 3: Recovery
+
+Once you restart the TaskManager the Job will recover from its last successful 
+[checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html) prior to the failure.
+
+{% highlight bash%}
+docker-compose up -d taskmanager
+{% endhighlight %}
+
+Once the new TaskManager has registered itself with the Flink Master, the Job will start "RUNNING" 
+again. It will then quickly process the full backlog of input events (accumulated during the outage) 
+from Kafka and produce output at a much higher rate (> 24 records/minute) until it has caught up to 
+the head of the queue. In the *output* you will see that all keys (`page`s) are present for all time 
+windows and the count is exactly one thousand. Since we are using the 
+[FlinkKafkaProducer]({{ site.baseurl }}/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance)
+in its "at-least-once" mode, there is a chance that you will see some output records twice.
+
+### Upgrading & Rescaling the Job
+
+Upgrading a Flink Job always involves two steps: First, the Flink Job is gracefully stopped with a
+[Savepoint]({{site.base_url}}/ops/state/savepoints.html). A Savepoint is a consistent snapshot of 
+the complete application state at a well-defined, globally consistent point in time (similar to a 
+checkpoint). Second, the upgraded Flink Job is started from the Savepoint. In this context "upgrade" 
+can mean different things including the following:
+
+* An upgrade to the configuration (incl. the parallelism of the Job)
+* An upgrade to the topology of the Job (added/removed Operators)
+* An upgrade to the user-defined functions of the Job
+
+Before starting with the upgrade you might want to start tailing the *output* topic, in order to 
+observe that no data is lost or corrupted in the course the upgrade. 
+
+{% highlight bash%}
+docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
+{% endhighlight %}
+
+#### Step 1: Stopping the Job
+
+To gracefully stop the Job, you need to use the "stop" command of either the CLI or the REST API. 
+For this you will need the JobID of the Job, which you can obtain by 
+[listing all running Jobs](#listing-running-jobs) or from the WebUI. With the JobID you can proceed 
+to stopping the Job:
+
+<div class="codetabs" markdown="1">
+<div data-lang="CLI" markdown="1">
+**Command**
+{% highlight bash %}
+docker-compose run --no-deps client flink stop <job-id>
+{% endhighlight %}
+**Expected Output**
+{% highlight bash %}
+Suspending job "<job-id>" with a savepoint.
+Suspended job "<job-id>" with a savepoint.
+{% endhighlight %}
+</div>
+ <div data-lang="REST API" markdown="1">
+ 
+ **Request**
+{% highlight bash %}
+# triggering stop
+curl -X POST localhost:8081/jobs/<job-id>/stop -d '{"drain": false}'
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "request-id": "<trigger-id>"
+}
+{% endhighlight %}
+
+**Request**
+{% highlight bash %}
+# check status of stop action
+ curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id>
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "status": {
+    "id": "COMPLETED"
+  },
+  "operation": {
+    "location": "<savepoint-path>"
+  }
+
+{% endhighlight %}
+</div>
+</div>
+
+The Savepoint has been stored to the `state.savepoint.dir` configured in the *flink-conf.yaml*, 
+which is mounted under */tmp/flink-savepoints-directory/* on your local machine. You will need the 
+path to this Savepoint in the next step. In case of the REST API this path was already part of the 
+response, you will need to have a look at the filesystem directly.
+
+**Command**
+{% highlight bash %}
+ls -lia /tmp/flink-savepoints-directory
+{% endhighlight %}
+
+**Expected Output**
+{% highlight bash %}
+total 0
+  17 drwxr-xr-x   3 root root   60 17 jul 17:05 .
+   2 drwxrwxrwt 135 root root 3420 17 jul 17:09 ..
+1002 drwxr-xr-x   2 root root  140 17 jul 17:05 savepoint-<short-job-id>-<uuid>
+{% endhighlight %}
+
+#### Step 2a: Restart Job without Changes
+
+You can now restart the upgraded Job from this Savepoint. For simplicity, you can start by 
+restarting it without any changes.
+
+<div class="codetabs" markdown="1">
+<div data-lang="CLI" markdown="1">
+**Command**
+{% highlight bash %}
+docker-compose run --no-deps client flink run -s <savepoint-path> -d /opt/flink/examples/streaming/ClickEventCount.jar --bootstrap.servers kafka:9092 --checkpointing --event-time
+{% endhighlight %}
+**Expected Output**
+{% highlight bash %}
+Starting execution of program
+Job has been submitted with JobID <job-id>
+{% endhighlight %}
+</div>
+<div data-lang="REST API" markdown="1">
+
+**Request**
+{% highlight bash %}
+# Uploading the JAR
+curl -X POST -H "Expect:" -F "jarfile=@/opt/flink/examples/streaming/ClickEventCount.jar" http://localhost:8081/jars/upload
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
+  "status": "success"
+}
+
+{% endhighlight %}
+
+**Request**
+{% highlight bash %}
+# Submitting the Job
+curl -X POST http://localhost:8081/jars/<jar-id>/run -d {"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}
+{% endhighlight %}
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "jobid": "<job-id>"
+}
+{% endhighlight %}
+</div>
+</div>
+
+Once the Job is "RUNNING" again, you will see in the *output* Topic that records are produced at a 
+higher rate while the Job is processing the backlog accumulated during the outage. Additionally, 
+you will see that no data was lost during the upgrade: all windows are present with a count of 
+exactly one thousand. 
+
+#### Step 2b: Restart Job with a Different Parallelism (Rescaling)
+
+Alternatively, you could also rescale the Job from this Savepoint by passing a different parallelism
+during resubmission.
+
+<div class="codetabs" markdown="1">
+<div data-lang="CLI" markdown="1">
+**Command**
+{% highlight bash %}
+docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> -d /opt/flink/examples/streaming/ClickEventCount.jar --bootstrap.servers kafka:9092 --checkpointing --event-time
+{% endhighlight %}
+**Expected Output**
+{% highlight bash %}
+Starting execution of program
+Job has been submitted with JobID <job-id>
+{% endhighlight %}
+</div>
+<div data-lang="REST API" markdown="1">
+
+**Request**
+{% highlight bash %}
+# Uploading the JAR
+curl -X POST -H "Expect:" -F "jarfile=@/opt/flink/examples/streaming/ClickEventCount.jar" http://localhost:8081/jars/upload
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
+  "status": "success"
+}
+
+{% endhighlight %}
+
+**Request**
+{% highlight bash %}
+# Submitting the Job
+curl -X POST http://localhost:8081/jars/<jar-id>/run -d {"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}
+{% endhighlight %}
+**Expected Response (pretty-printed**
+{% highlight json %}
+{
+  "jobid": "<job-id>"
+}
+{% endhighlight %}
+</div>
+</div>
+Now, the Job has been resubmitted, but it will not start as there are not enough TaskSlots to
+execute it with the increased parallelism (1 available, 3 needed). With
+{% highlight bash %}
+docker-compose scale taskmanager=2
+{% endhighlight %}
+you can add a second TaskManager to the Flink Cluster, which will automatically register with the 
+Flink Master. Shortly after adding the TaskManager the Job should start running again.
+
+Once the Job is "RUNNING" again, you will see in the *output* Topic that now data was lost during 
+rescaling: all windows are present with a count of exactly one thousand.
+
+### Querying the Metrics of a Job
+
+The Flink Master exposes system and user [metrics]({{ site.baseurl }}/monitoring/metrics.html)
+via its REST API.
+
+The endpoint depends on the scope of these metrics. Metrics scoped to a Job can be listed via 
+`jobs/<job-id>/metrics`. The actual value of a metric can be queried via the `get` query parameter.
+
+**Request**
+{% highlight bash %}
+curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"
+{% endhighlight %}
+**Expected Response (pretty-printed; no placeholders)**
+{% highlight json %}
+[
+  {
+    "id": "lastCheckpointSize",
+    "value": "9378"
+  }
+]
+{% endhighlight %}
+
+The REST API can not only be used to query metrics, but you can also retrieve detailed information
+about the status of a running Job. 
+
+**Request**
+{% highlight bash %}
+# find the vertex-id of the vertex of interest
+curl localhost:8081/jobs/<jod-id>
+{% endhighlight %}
+
+**Expected Response (pretty-printed)**
+{% highlight json %}
+{
+  "jid": "<job-id>",
+  "name": "Click Event Count",
+  "isStoppable": false,
+  "state": "RUNNING",
+  "start-time": 1564467066026,
+  "end-time": -1,
+  "duration": 374793,
+  "now": 1564467440819,
+  "timestamps": {
+    "CREATED": 1564467066026,
+    "FINISHED": 0,
+    "SUSPENDED": 0,
+    "FAILING": 0,
+    "CANCELLING": 0,
+    "CANCELED": 0,
+    "RECONCILING": 0,
+    "RUNNING": 1564467066126,
+    "FAILED": 0,
+    "RESTARTING": 0
+  },
+  "vertices": [
+    {
+      "id": "<vertex-id>",
+      "name": "ClickEvent Source",
+      "parallelism": 2,
+      "status": "RUNNING",
+      "start-time": 1564467066423,
+      "end-time": -1,
+      "duration": 374396,
+      "tasks": {
+        "CREATED": 0,
+        "FINISHED": 0,
+        "DEPLOYING": 0,
+        "RUNNING": 2,
+        "CANCELING": 0,
+        "FAILED": 0,
+        "CANCELED": 0,
+        "RECONCILING": 0,
+        "SCHEDULED": 0
+      },
+      "metrics": {
+        "read-bytes": 0,
+        "read-bytes-complete": true,
+        "write-bytes": 5033461,
+        "write-bytes-complete": true,
+        "read-records": 0,
+        "read-records-complete": true,
+        "write-records": 166351,
+        "write-records-complete": true
+      }
+    },
+    {
+      "id": "<vertex-id>",
+      "name": "Timestamps/Watermarks",
+      "parallelism": 2,
+      "status": "RUNNING",
+      "start-time": 1564467066441,
+      "end-time": -1,
+      "duration": 374378,
+      "tasks": {
+        "CREATED": 0,
+        "FINISHED": 0,
+        "DEPLOYING": 0,
+        "RUNNING": 2,
+        "CANCELING": 0,
+        "FAILED": 0,
+        "CANCELED": 0,
+        "RECONCILING": 0,
+        "SCHEDULED": 0
+      },
+      "metrics": {
+        "read-bytes": 5066280,
+        "read-bytes-complete": true,
+        "write-bytes": 5033496,
+        "write-bytes-complete": true,
+        "read-records": 166349,
+        "read-records-complete": true,
+        "write-records": 166349,
+        "write-records-complete": true
+      }
+    },
+    {
+      "id": "<vertex-id>",
+      "name": "ClickEvent Counter",
+      "parallelism": 2,
+      "status": "RUNNING",
+      "start-time": 1564467066469,
+      "end-time": -1,
+      "duration": 374350,
+      "tasks": {
+        "CREATED": 0,
+        "FINISHED": 0,
+        "DEPLOYING": 0,
+        "RUNNING": 2,
+        "CANCELING": 0,
+        "FAILED": 0,
+        "CANCELED": 0,
+        "RECONCILING": 0,
+        "SCHEDULED": 0
+      },
+      "metrics": {
+        "read-bytes": 5085332,
+        "read-bytes-complete": true,
+        "write-bytes": 316,
+        "write-bytes-complete": true,
+        "read-records": 166305,
+        "read-records-complete": true,
+        "write-records": 6,
+        "write-records-complete": true
+      }
+    },
+    {
+      "id": "<vertex-id>",
+      "name": "ClickEventStatistics Sink",
+      "parallelism": 2,
+      "status": "RUNNING",
+      "start-time": 1564467066476,
+      "end-time": -1,
+      "duration": 374343,
+      "tasks": {
+        "CREATED": 0,
+        "FINISHED": 0,
+        "DEPLOYING": 0,
+        "RUNNING": 2,
+        "CANCELING": 0,
+        "FAILED": 0,
+        "CANCELED": 0,
+        "RECONCILING": 0,
+        "SCHEDULED": 0
+      },
+      "metrics": {
+        "read-bytes": 20668,
+        "read-bytes-complete": true,
+        "write-bytes": 0,
+        "write-bytes-complete": true,
+        "read-records": 6,
+        "read-records-complete": true,
+        "write-records": 0,
+        "write-records-complete": true
+      }
+    }
+  ],
+  "status-counts": {
+    "CREATED": 0,
+    "FINISHED": 0,
+    "DEPLOYING": 0,
+    "RUNNING": 4,
+    "CANCELING": 0,
+    "FAILED": 0,
+    "CANCELED": 0,
+    "RECONCILING": 0,
+    "SCHEDULED": 0
+  },
+  "plan": {
+    "jid": "<job-id>",
+    "name": "Click Event Count",
+    "nodes": [
+      {
+        "id": "<vertex-id>",
+        "parallelism": 2,
+        "operator": "",
+        "operator_strategy": "",
+        "description": "ClickEventStatistics Sink",
+        "inputs": [
+          {
+            "num": 0,
+            "id": "<vertex-id>",
+            "ship_strategy": "FORWARD",
+            "exchange": "pipelined_bounded"
+          }
+        ],
+        "optimizer_properties": {}
+      },
+      {
+        "id": "<vertex-id>",
+        "parallelism": 2,
+        "operator": "",
+        "operator_strategy": "",
+        "description": "ClickEvent Counter",
+        "inputs": [
+          {
+            "num": 0,
+            "id": "<vertex-id>",
+            "ship_strategy": "HASH",
+            "exchange": "pipelined_bounded"
+          }
+        ],
+        "optimizer_properties": {}
+      },
+      {
+        "id": "<vertex-id>",
+        "parallelism": 2,
+        "operator": "",
+        "operator_strategy": "",
+        "description": "Timestamps/Watermarks",
+        "inputs": [
+          {
+            "num": 0,
+            "id": "<vertex-id>",
+            "ship_strategy": "FORWARD",
+            "exchange": "pipelined_bounded"
+          }
+        ],
+        "optimizer_properties": {}
+      },
+      {
+        "id": "<vertex-id>",
+        "parallelism": 2,
+        "operator": "",
+        "operator_strategy": "",
+        "description": "ClickEvent Source",
+        "optimizer_properties": {}
+      }
+    ]
+  }
+}
+{% endhighlight %}
+
+Please consult the [REST API reference](https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#api)
+for a complete list of possible queries including how to query metrics of different scopes (e.g. 
+TaskManager metrics);
+
+{%  top %}
+
+## Variants
+
+You might have noticed that the *Click Event Count* was always started with `--checkpointing` and 
+`--event-time` program arguments. By omitting these in the command of the *client* container in the 
+`docker-compose.yaml`, you can change the behavior of the Job.
+
+* `--checkpointing` enables [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html), 
+which is Flink's fault-tolerance mechanism. If you run without it and go through 
+[failure and recovery](#observing-failure--recovery), you should will see that data is actually 
+lost.
+
+* `--event-time` enables [event time semantics]({{ site.baseurl }}/dev/event_time.html) for your 
+Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of 
+the timestamp of the `ClickEvent`. Consequently, the number of events per window will not be exactly
+one thousand anymore. 
diff --git a/docs/getting-started/docker-playgrounds/index.md b/docs/getting-started/docker-playgrounds/index.md
new file mode 100644
index 0000000..2051e46
--- /dev/null
+++ b/docs/getting-started/docker-playgrounds/index.md
@@ -0,0 +1,25 @@
+---
+title: Docker Playgrounds
+nav-id: docker-playgrounds
+nav-title: '<i class="fa fa-ship title appetizer" aria-hidden="true"></i> Docker Playgrounds'
+nav-parent_id: getting-started
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/getting-started/docker-playgrounds/index.zh.md b/docs/getting-started/docker-playgrounds/index.zh.md
new file mode 100644
index 0000000..2051e46
--- /dev/null
+++ b/docs/getting-started/docker-playgrounds/index.zh.md
@@ -0,0 +1,25 @@
+---
+title: Docker Playgrounds
+nav-id: docker-playgrounds
+nav-title: '<i class="fa fa-ship title appetizer" aria-hidden="true"></i> Docker Playgrounds'
+nav-parent_id: getting-started
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 1e80eaa..9697edf 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -193,6 +193,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-examples-streaming-click-event-count_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-examples-streaming-twitter_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 7289153..b184eb9 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -239,6 +239,17 @@ under the License.
 				<exclude>original-*.jar</exclude>
 			</excludes>
 		</fileSet>
+		<fileSet>
+			<directory>../flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/target</directory>
+			<outputDirectory>examples/streaming</outputDirectory>
+			<fileMode>0644</fileMode>
+			<includes>
+				<include>*.jar</include>
+			</includes>
+			<excludes>
+				<exclude>original-*.jar</exclude>
+			</excludes>
+		</fileSet>
 
 		<!-- copy jar files of the gelly examples -->
 		<fileSet>
diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/pom.xml b/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/pom.xml
new file mode 100644
index 0000000..c2c23eb
--- /dev/null
+++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<artifactId>flink-examples-build-helper</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-examples-streaming-click-event-count_${scala.binary.version}</artifactId>
+	<name>flink-examples-streaming-click-event-count</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<finalName>ClickEventCount</finalName>
+
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-deploy-plugin</artifactId>
+				<configuration>
+					<skip>true</skip>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadeTestJar>false</shadeTestJar>
+							<shadedArtifactAttached>false</shadedArtifactAttached>
+							<createDependencyReducedPom>false</createDependencyReducedPom>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventCount</mainClass>
+								</transformer>
+							</transformers>
+							<artifactSet>
+								<includes>
+									<include>org.apache.flink:flink-connector-kafka*</include>
+									<include>org.apache.flink:flink-examples-streaming*</include>
+									<include>org.apache.kafka:*</include>
+								</includes>
+							</artifactSet>
+
+							<filters>
+								<filter>
+									<artifact>org.apache.flink:flink-examples-streaming_*</artifact>
+									<includes>
+										<include>org/apache/flink/streaming/examples/windowing/clickeventcount/**</include>
+									</includes>
+								</filter>
+								<filter>
+									<artifact>org.apache.kafka:*</artifact>
+									<excludes>
+										<exclude>LICENSE</exclude>
+										<!-- Does not contain anything relevant.
+											Cites a binary dependency on jersey, but this is neither reflected in the
+											dependency graph, nor are any jersey files bundled. -->
+										<exclude>NOTICE</exclude>
+									</excludes>
+								</filter>
+							</filters>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE b/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/src/main/resources/META-INF/NOTICE
similarity index 78%
copy from flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE
copy to flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/src/main/resources/META-INF/NOTICE
index af71222..137e0c6 100644
--- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE
+++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/src/main/resources/META-INF/NOTICE
@@ -1,4 +1,4 @@
-flink-examples-streaming-state-machine
+flink-examples-streaming-click-event-count
 Copyright 2014-2019 The Apache Software Foundation
 
 This product includes software developed at
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.kafka:kafka-clients:0.10.2.1
+- org.apache.kafka:kafka-clients:2.2.0
diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE b/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE
index af71222..9161b8b 100644
--- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE
+++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.kafka:kafka-clients:0.10.2.1
+- org.apache.kafka:kafka-clients:2.2.0
diff --git a/flink-examples/flink-examples-build-helper/pom.xml b/flink-examples/flink-examples-build-helper/pom.xml
index 6dd769d..6684ef4 100644
--- a/flink-examples/flink-examples-build-helper/pom.xml
+++ b/flink-examples/flink-examples-build-helper/pom.xml
@@ -36,6 +36,7 @@ under the License.
 		<module>flink-examples-streaming-twitter</module>
 		<module>flink-examples-streaming-state-machine</module>
 		<module>flink-examples-streaming-gcp-pubsub</module>
+		<module>flink-examples-streaming-click-event-count</module>
 	</modules>
 
 </project>
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index a6109f2..875ff1c 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -58,7 +58,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -364,7 +364,6 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
-
 				</executions>
 			</plugin>
 
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
index 059b2c0..aa030a7 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.examples.statemachine;
 
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
 import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
 
@@ -46,7 +46,7 @@ public class KafkaEventsGeneratorJob {
 
 		env
 			.addSource(new EventsGeneratorSource(errorRate, sleep))
-			.addSink(new FlinkKafkaProducer010<>(brokers, kafkaTopic, new EventDeSerializer()));
+			.addSink(new FlinkKafkaProducer<>(brokers, kafkaTopic, new EventDeSerializer()));
 
 		// trigger program execution
 		env.execute("State machine example Kafka events generator job");
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index 054ed0a..00e2b0e 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.examples.statemachine.dfa.State;
 import org.apache.flink.streaming.examples.statemachine.event.Alert;
 import org.apache.flink.streaming.examples.statemachine.event.Event;
@@ -82,7 +82,7 @@ public class StateMachineExample {
 			Properties kafkaProps = new Properties();
 			kafkaProps.setProperty("bootstrap.servers", brokers);
 
-			FlinkKafkaConsumer010<Event> kafka = new FlinkKafkaConsumer010<>(kafkaTopic, new EventDeSerializer(), kafkaProps);
+			FlinkKafkaConsumer<Event> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps);
 			kafka.setStartFromLatest();
 			kafka.setCommitOffsetsOnCheckpoints(false);
 			source = kafka;
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventCount.java
new file mode 100644
index 0000000..2c02292
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventCount.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.functions.ClickEventStatisticsCollector;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.functions.CountingAggregator;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventDeserializationSchema;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatistics;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatisticsSerializationSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A simple streaming job reading {@link ClickEvent}s from Kafka, counting events per 15 seconds and
+ * writing the resulting {@link ClickEventStatistics} back to Kafka.
+ *
+ * <p> It can be run with or without checkpointing and with event time or processing time semantics.
+ * </p>
+ *
+ * <p>The Job can be configured via the command line:</p>
+ * * "--checkpointing": enables checkpointing
+ * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
+ * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
+ * * "--bootstrap.servers": comma-separated list of Kafka brokers
+ *
+ */
+public class ClickEventCount {
+
+	public static final String CHECKPOINTING_OPTION = "checkpointing";
+	public static final String EVENT_TIME_OPTION = "event-time";
+
+	public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		configureEnvironment(params, env);
+
+		String inputTopic = params.get("input-topic", "input");
+		String outputTopic = params.get("output-topic", "output");
+		String brokers = params.get("bootstrap.servers", "localhost:9092");
+		Properties kafkaProps = new Properties();
+		kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+		kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
+
+		env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
+			.name("ClickEvent Source")
+			.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
+				@Override
+				public long extractTimestamp(final ClickEvent element) {
+					return element.getTimestamp().getTime();
+				}
+			})
+			.keyBy(ClickEvent::getPage)
+			.timeWindow(WINDOW_SIZE)
+			.aggregate(new CountingAggregator(),
+				new ClickEventStatisticsCollector())
+			.name("ClickEvent Counter")
+			.addSink(new FlinkKafkaProducer<>(
+				outputTopic,
+				new ClickEventStatisticsSerializationSchema(outputTopic),
+				kafkaProps,
+				FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
+			.name("ClickEventStatistics Sink");
+
+		env.execute("Click Event Count");
+	}
+
+	private static void configureEnvironment(
+			final ParameterTool params,
+			final StreamExecutionEnvironment env) {
+
+		boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION);
+		boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION);
+
+		if (checkpointingEnabled) {
+			env.enableCheckpointing(1000);
+		}
+
+		if (eventTimeSemantics) {
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		}
+
+		//disabling Operator chaining to make it easier to follow the Job in the WebUI
+		env.disableOperatorChaining();
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventGenerator.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventGenerator.java
new file mode 100644
index 0000000..17943bf
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventGenerator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventSerializationSchema;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventCount.WINDOW_SIZE;
+
+/**
+ * A generator which pushes {@link ClickEvent}s into a Kafka Topic configured via `--topic` and
+ * `--bootstrap.servers`.
+ *
+ * <p> The generator creates the same number of {@link ClickEvent}s for all pages. The delay between
+ * events is chosen such that processing time and event time roughly align. The generator always
+ * creates the same sequence of events. </p>
+ *
+ */
+public class ClickEventGenerator {
+
+	public static final int EVENTS_PER_WINDOW = 1000;
+
+	private static final List<String> pages = Arrays.asList("/help", "/index", "/shop", "/jobs", "/about", "/news");
+
+	//this calculation is only accurate as long as pages.size() * EVENTS_PER_WINDOW divides the
+	//window size
+	public static final long DELAY = WINDOW_SIZE.toMilliseconds() / pages.size() / EVENTS_PER_WINDOW;
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
+		String topic = params.get("topic", "input");
+
+		Properties kafkaProps = createKafkaProperties(params);
+
+		KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps);
+
+		ClickIterator clickIterator = new ClickIterator();
+
+		while (true) {
+
+			ProducerRecord<byte[], byte[]> record = new ClickEventSerializationSchema(topic).serialize(
+					clickIterator.next(),
+					null);
+
+			producer.send(record);
+
+			Thread.sleep(DELAY);
+		}
+	}
+
+	private static Properties createKafkaProperties(final ParameterTool params) {
+		String brokers = params.get("bootstrap.servers", "localhost:9092");
+		Properties kafkaProps = new Properties();
+		kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+		kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		return kafkaProps;
+	}
+
+	static class ClickIterator  {
+
+		private Map<String, Long> nextTimestampPerKey;
+		private int nextPageIndex;
+
+		ClickIterator() {
+			nextTimestampPerKey = new HashMap<>();
+			nextPageIndex = 0;
+		}
+
+		ClickEvent next() {
+			String page = nextPage();
+			return new ClickEvent(nextTimestamp(page), page);
+		}
+
+		private Date nextTimestamp(String page) {
+			long nextTimestamp = nextTimestampPerKey.getOrDefault(page, 0L);
+			nextTimestampPerKey.put(page, nextTimestamp + WINDOW_SIZE.toMilliseconds() / EVENTS_PER_WINDOW);
+			return new Date(nextTimestamp);
+		}
+
+		private String nextPage() {
+			String nextPage = pages.get(nextPageIndex);
+			if (nextPageIndex == pages.size() - 1) {
+				nextPageIndex = 0;
+			} else {
+				nextPageIndex++;
+			}
+			return nextPage;
+		}
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/ClickEventStatisticsCollector.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/ClickEventStatisticsCollector.java
new file mode 100644
index 0000000..76af954
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/ClickEventStatisticsCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount.functions;
+
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatistics;
+import org.apache.flink.util.Collector;
+
+import java.util.Date;
+
+/**
+ * A simple {@link ProcessWindowFunction}, which wraps a count of {@link ClickEvent}s into an
+ * instance of {@link ClickEventStatistics}.
+ *
+ **/
+public class ClickEventStatisticsCollector
+		extends ProcessWindowFunction<Long, ClickEventStatistics, String, TimeWindow> {
+
+	@Override
+	public void process(
+			final String page,
+			final Context context,
+			final Iterable<Long> elements,
+			final Collector<ClickEventStatistics> out) throws Exception {
+
+		Long count = elements.iterator().next();
+
+		out.collect(new ClickEventStatistics(new Date(context.window().getStart()), new Date(context.window().getEnd()), page, count));
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/CountingAggregator.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/CountingAggregator.java
new file mode 100644
index 0000000..86f8190
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/CountingAggregator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent;
+
+/**
+ * An {@link AggregateFunction} which simply counts {@link ClickEvent}s.
+ *
+ */
+public class CountingAggregator implements AggregateFunction<ClickEvent, Long, Long> {
+	@Override
+	public Long createAccumulator() {
+		return 0L;
+	}
+
+	@Override
+	public Long add(final ClickEvent value, final Long accumulator) {
+		return accumulator + 1;
+	}
+
+	@Override
+	public Long getResult(final Long accumulator) {
+		return accumulator;
+	}
+
+	@Override
+	public Long merge(final Long a, final Long b) {
+		return a + b;
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEvent.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEvent.java
new file mode 100644
index 0000000..47e0882
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEvent.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
+
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * A simple event recording a click on a {@link ClickEvent#page} at time {@link ClickEvent#timestamp}.
+ *
+ */
+public class ClickEvent {
+
+	//using java.util.Date for better readability in Flink Cluster Playground
+	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+	private Date timestamp;
+	private String page;
+
+	public ClickEvent() {
+	}
+
+	public ClickEvent(final Date timestamp, final String page) {
+		this.timestamp = timestamp;
+		this.page = page;
+	}
+
+	public Date getTimestamp() {
+		return timestamp;
+	}
+
+	public void setTimestamp(final Date timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	public String getPage() {
+		return page;
+	}
+
+	public void setPage(final String page) {
+		this.page = page;
+	}
+
+	@Override
+	public boolean equals(final Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final ClickEvent that = (ClickEvent) o;
+		return Objects.equals(timestamp, that.timestamp) && Objects.equals(page, that.page);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(timestamp, page);
+	}
+
+	@Override
+	public String toString() {
+		final StringBuilder sb = new StringBuilder("ClickEvent{");
+		sb.append("timestamp=").append(timestamp);
+		sb.append(", page='").append(page).append('\'');
+		sb.append('}');
+		return sb.toString();
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventDeserializationSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventDeserializationSchema.java
new file mode 100644
index 0000000..8da3ad1
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventDeserializationSchema.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
+ *
+ */
+public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final ObjectMapper objectMapper = new ObjectMapper();
+
+	@Override
+	public ClickEvent deserialize(byte[] message) throws IOException {
+		return objectMapper.readValue(message, ClickEvent.class);
+	}
+
+	@Override
+	public boolean isEndOfStream(ClickEvent nextElement) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<ClickEvent> getProducedType() {
+		return TypeInformation.of(ClickEvent.class);
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventSerializationSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventSerializationSchema.java
new file mode 100644
index 0000000..fda0d05
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventSerializationSchema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
+
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEvent}s as JSON.
+ *
+ */
+public class ClickEventSerializationSchema implements KafkaSerializationSchema<ClickEvent> {
+
+	private static final ObjectMapper objectMapper = new ObjectMapper();
+	private String topic;
+
+	public ClickEventSerializationSchema(){
+	}
+
+	public ClickEventSerializationSchema(String topic) {
+		this.topic = topic;
+	}
+
+	@Override
+	public ProducerRecord<byte[], byte[]> serialize(
+			final ClickEvent message, @Nullable final Long timestamp) {
+		try {
+			//if topic is null, default topic will be used
+			return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
+		} catch (JsonProcessingException e) {
+			throw new IllegalArgumentException("Could not serialize record: " + message, e);
+		}
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatistics.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatistics.java
new file mode 100644
index 0000000..ade3911
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatistics.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
+
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * A small wrapper class for windowed page counts.
+ *
+ */
+public class ClickEventStatistics {
+
+	//using java.util.Date for better readability in Flink Cluster Playground
+	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+	private Date windowStart;
+	//using java.util.Date for better readability in Flink Cluster Playground
+	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+	private Date windowEnd;
+	private String page;
+	private long count;
+
+	public ClickEventStatistics() {
+	}
+
+	public ClickEventStatistics(
+			final Date windowStart,
+			final Date windowEnd,
+			final String page,
+			final long count) {
+		this.windowStart = windowStart;
+		this.windowEnd = windowEnd;
+		this.page = page;
+		this.count = count;
+	}
+
+	public Date getWindowStart() {
+		return windowStart;
+	}
+
+	public void setWindowStart(final Date windowStart) {
+		this.windowStart = windowStart;
+	}
+
+	public Date getWindowEnd() {
+		return windowEnd;
+	}
+
+	public void setWindowEnd(final Date windowEnd) {
+		this.windowEnd = windowEnd;
+	}
+
+	public String getPage() {
+		return page;
+	}
+
+	public void setPage(final String page) {
+		this.page = page;
+	}
+
+	public long getCount() {
+		return count;
+	}
+
+	public void setCount(final long count) {
+		this.count = count;
+	}
+
+	@Override
+	public boolean equals(final Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final ClickEventStatistics that = (ClickEventStatistics) o;
+		return count == that.count &&
+				Objects.equals(windowStart, that.windowStart) &&
+				Objects.equals(windowEnd, that.windowEnd) &&
+				Objects.equals(page, that.page);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(windowStart, windowEnd, page, count);
+	}
+
+	@Override
+	public String toString() {
+		final StringBuilder sb = new StringBuilder("ClickEventStatistics{");
+		sb.append("windowStart=").append(windowStart);
+		sb.append(", windowEnd=").append(windowEnd);
+		sb.append(", page='").append(page).append('\'');
+		sb.append(", count=").append(count);
+		sb.append('}');
+		return sb.toString();
+	}
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatisticsSerializationSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatisticsSerializationSchema.java
new file mode 100644
index 0000000..897691f
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatisticsSerializationSchema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
+
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
+ *
+ */
+public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {
+
+	private static final ObjectMapper objectMapper = new ObjectMapper();
+	private String topic;
+
+	public ClickEventStatisticsSerializationSchema(){
+	}
+
+	public ClickEventStatisticsSerializationSchema(String topic) {
+		this.topic = topic;
+	}
+
+	@Override
+	public ProducerRecord<byte[], byte[]> serialize(
+			final ClickEventStatistics message, @Nullable final Long timestamp) {
+		try {
+			//if topic is null, default topic will be used
+			return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
+		} catch (JsonProcessingException e) {
+			throw new IllegalArgumentException("Could not serialize record: " + message, e);
+		}
+	}
+}