You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/08/19 16:10:03 UTC

[flink-playgrounds] 02/02: [FLINK-12749] Add Dockerfile for Operations Playground image

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 626ca07bf71f0b1340df25677ae1ec0969719a9c
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Mon Aug 12 18:48:23 2019 +0200

    [FLINK-12749] Add Dockerfile for Operations Playground image
    
    * Rename Cluster Playground to Operations Playground
    * Add Java code for Click Count demo job
    * Add Dockerfile to build the custom image for the operations playground
    * Add README files
---
 .gitignore                                         |   3 +
 README.md                                          |  21 +-
 .../ops-playground-image/Dockerfile                |  32 ++-
 docker/ops-playground-image/README.md              |   5 +
 .../java/flink-playground-clickcountjob/LICENSE    | 201 ++++++++++++++++++
 .../java/flink-playground-clickcountjob/pom.xml    | 230 +++++++++++++++++++++
 .../ops/clickcount/ClickEventCount.java            | 116 +++++++++++
 .../ops/clickcount/ClickEventGenerator.java        | 122 +++++++++++
 .../functions/ClickEventStatisticsCollector.java   |  47 +++++
 .../clickcount/functions/CountingAggregator.java   |  47 +++++
 .../ops/clickcount/records/ClickEvent.java         |  85 ++++++++
 .../records/ClickEventDeserializationSchema.java   |  51 +++++
 .../records/ClickEventSerializationSchema.java     |  46 +++++
 .../clickcount/records/ClickEventStatistics.java   | 116 +++++++++++
 .../ClickEventStatisticsSerializationSchema.java   |  42 ++++
 .../src/main/resources/log4j.properties            |  15 +-
 howto-update-playgrounds.md                        |  42 ++++
 operations-playground/README.md                    |  50 +++++
 .../conf/flink-conf.yaml                           |   0
 .../conf/log4j-cli.properties                      |   0
 .../conf/log4j-console.properties                  |   0
 .../docker-compose.yaml                            |  13 +-
 22 files changed, 1247 insertions(+), 37 deletions(-)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..d4e4d76
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+*/.idea
+*/target
+*/dependency-reduced-pom.xml
diff --git a/README.md b/README.md
index 8fe2903..c9881a3 100644
--- a/README.md
+++ b/README.md
@@ -1,20 +1,23 @@
 # Apache Flink Playgrounds
 
-Apache Flink is an open source stream processing framework with powerful stream- and batch-
-processing capabilities.
+This repository provides playgrounds to quickly and easily explore [Apache Flink](https://flink.apache.org)'s features.
 
-Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/)
+The playgrounds are based on [docker-compose](https://docs.docker.com/compose/) environments.
+Each subfolder of this repository contains the docker-compose setup of a playground, except for the `./docker` folder which contains code and configuration to build custom Docker images for the playgrounds.
 
-## Playgrounds
+## Available Playgrounds
 
-This repository contains the configuration files for two Apache Flink playgrounds.
+Currently, the following playgrounds are available:
 
-* The [Flink Cluster Playground](../master/flink-cluster-playground) consists of a Flink Session Cluster, a Kafka Cluster and a simple 
-Flink Job. It is explained in detail as part of 
-[Apache Flink's "Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-cluster-playground.html). 
+* The **Flink Operations Playground** in the (`operations-playground` folder) let's you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example 
+Flink job. The playground is presented in detail in the
+["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
 
 * The interactive SQL playground is still under development and will be added shortly.
 
 ## About
 
-Apache Flink is an open source project of The Apache Software Foundation (ASF).
\ No newline at end of file
+Apache Flink is an open source project of The Apache Software Foundation (ASF).
+
+Flink is distributed data processing framework with powerful stream and batch processing capabilities.
+Learn more about Flink at [http://flink.apache.org/](https://flink.apache.org/)
diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/docker/ops-playground-image/Dockerfile
similarity index 52%
copy from flink-cluster-playground/conf/flink-conf.yaml
copy to docker/ops-playground-image/Dockerfile
index 5c8d0e6..8b64428 100644
--- a/flink-cluster-playground/conf/flink-conf.yaml
+++ b/docker/ops-playground-image/Dockerfile
@@ -1,4 +1,4 @@
-################################################################################
+###############################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -14,17 +14,27 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 # limitations under the License.
-################################################################################
+###############################################################################
 
-jobmanager.rpc.address: jobmanager
-blob.server.port: 6124
-query.server.port: 6125
+###############################################################################
+# Build Click Count Job
+###############################################################################
 
-taskmanager.numberOfTaskSlots: 2
+FROM maven:3.6-jdk-8-slim AS builder
 
-state.backend: filesystem
-state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
-state.savepoints.dir: file:///tmp/flink-savepoints-directory
+# Get Click Count job and compile it
+COPY ./java/flink-playground-clickcountjob /opt/flink-playground-clickcountjob
+WORKDIR /opt/flink-playground-clickcountjob
+RUN mvn clean install
 
-heartbeat.interval: 1000
-heartbeat.timeout: 5000
+
+###############################################################################
+# Build Operations Playground Image
+###############################################################################
+
+FROM flink:1.8.1-scala_2.11
+
+WORKDIR /opt/flink/bin
+
+# Copy Click Count Job
+COPY --from=builder /opt/flink-playground-clickcountjob/target/flink-playground-clickcountjob-*.jar /opt/ClickCountJob.jar
diff --git a/docker/ops-playground-image/README.md b/docker/ops-playground-image/README.md
new file mode 100644
index 0000000..a22382d
--- /dev/null
+++ b/docker/ops-playground-image/README.md
@@ -0,0 +1,5 @@
+# Flink Operations Playground Image
+
+The image defined by the `Dockerfile` in this folder is required by the Flink operations playground.
+
+The image is based on the official Flink image and adds a demo Flink job (Click Event Count) and a corresponding data generator. The code of the application is located in the `./java/flink-ops-playground` folder.
\ No newline at end of file
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE b/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
new file mode 100644
index 0000000..f1f9b89
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -0,0 +1,230 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-playground-clickcountjob</artifactId>
+	<version>1-FLINK-1.8_2.11</version>
+
+	<name>flink-playground-clickcountjob</name>
+	<packaging>jar</packaging>
+	<url>https://flink.apache.org</url>
+
+	<licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <scm>
+        <url>https://github.com/apache/flink-playgrounds</url>
+        <connection>git@github.com:apache/flink-playgrounds.git</connection>
+        <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/flink-playgrounds.git</developerConnection>
+    </scm>
+
+    <properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>1.8.1</flink.version>
+		<java.version>1.8</java.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<maven.compiler.source>${java.version}</maven.compiler.source>
+		<maven.compiler.target>${java.version}</maven.compiler.target>
+	</properties>
+
+	<dependencies>
+		<!-- Apache Flink dependencies -->
+		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Connector dependencies -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<!-- Logging dependencies -->
+		<!-- These dependencies are excluded from the application JAR by default. -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.7.7</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.17</version>
+			<scope>runtime</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+
+			<!-- Java Compiler -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>${java.version}</source>
+					<target>${java.version}</target>
+				</configuration>
+			</plugin>
+
+			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>org.apache.flink:force-shading</exclude>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<!-- Do not copy the signatures in the META-INF folder.
+									Otherwise, this might cause SecurityExceptions when using the JAR. -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.playgrounds.ops.clickcount.ClickEventCount</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+
+				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-shade-plugin</artifactId>
+										<versionRange>[3.0.0,)</versionRange>
+										<goals>
+											<goal>shade</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-compiler-plugin</artifactId>
+										<versionRange>[3.1,)</versionRange>
+										<goals>
+											<goal>testCompile</goal>
+											<goal>compile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
+	<!-- This profile helps to make things run out of the box in IntelliJ -->
+	<!-- Its adds Flink's core classes to the runtime class path. -->
+	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+	<profiles>
+		<profile>
+			<id>add-dependencies-for-IDEA</id>
+
+			<activation>
+				<property>
+					<name>idea.version</name>
+				</property>
+			</activation>
+
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-java</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+
+</project>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
new file mode 100644
index 0000000..9f609e9
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
+import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializationSchema;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A simple streaming job reading {@link ClickEvent}s from Kafka, counting events per 15 seconds and
+ * writing the resulting {@link ClickEventStatistics} back to Kafka.
+ *
+ * <p> It can be run with or without checkpointing and with event time or processing time semantics.
+ * </p>
+ *
+ * <p>The Job can be configured via the command line:</p>
+ * * "--checkpointing": enables checkpointing
+ * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
+ * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
+ * * "--bootstrap.servers": comma-separated list of Kafka brokers
+ *
+ */
+public class ClickEventCount {
+
+	public static final String CHECKPOINTING_OPTION = "checkpointing";
+	public static final String EVENT_TIME_OPTION = "event-time";
+
+	public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		configureEnvironment(params, env);
+
+		String inputTopic = params.get("input-topic", "input");
+		String outputTopic = params.get("output-topic", "output");
+		String brokers = params.get("bootstrap.servers", "localhost:9092");
+		Properties kafkaProps = new Properties();
+		kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+		kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
+
+		env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
+			.name("ClickEvent Source")
+			.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
+				@Override
+				public long extractTimestamp(final ClickEvent element) {
+					return element.getTimestamp().getTime();
+				}
+			})
+			.keyBy(ClickEvent::getPage)
+			.timeWindow(WINDOW_SIZE)
+			.aggregate(new CountingAggregator(),
+				new ClickEventStatisticsCollector())
+			.name("ClickEvent Counter")
+			.addSink(new FlinkKafkaProducer<ClickEventStatistics>(
+				outputTopic,
+				new ClickEventStatisticsSerializationSchema(),
+				kafkaProps))
+			.name("ClickEventStatistics Sink");
+
+		env.execute("Click Event Count");
+	}
+
+	private static void configureEnvironment(
+			final ParameterTool params,
+			final StreamExecutionEnvironment env) {
+
+		boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION);
+		boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION);
+
+		if (checkpointingEnabled) {
+			env.enableCheckpointing(1000);
+		}
+
+		if (eventTimeSemantics) {
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		}
+
+		//disabling Operator chaining to make it easier to follow the Job in the WebUI
+		env.disableOperatorChaining();
+	}
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
new file mode 100644
index 0000000..6a5c394
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventSerializationSchema;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.playgrounds.ops.clickcount.ClickEventCount.WINDOW_SIZE;
+
+/**
+ * A generator which pushes {@link ClickEvent}s into a Kafka Topic configured via `--topic` and
+ * `--bootstrap.servers`.
+ *
+ * <p> The generator creates the same number of {@link ClickEvent}s for all pages. The delay between
+ * events is chosen such that processing time and event time roughly align. The generator always
+ * creates the same sequence of events. </p>
+ *
+ */
+public class ClickEventGenerator {
+
+	public static final int EVENTS_PER_WINDOW = 1000;
+
+	private static final List<String> pages = Arrays.asList("/help", "/index", "/shop", "/jobs", "/about", "/news");
+
+	//this calculation is only accurate as long as pages.size() * EVENTS_PER_WINDOW divides the
+	//window size
+	public static final long DELAY = WINDOW_SIZE.toMilliseconds() / pages.size() / EVENTS_PER_WINDOW;
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
+		String topic = params.get("topic", "input");
+
+		Properties kafkaProps = createKafkaProperties(params);
+
+		KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps);
+
+		ClickIterator clickIterator = new ClickIterator();
+		SerializationSchema<ClickEvent> clickSerializer = new ClickEventSerializationSchema();
+
+		while (true) {
+
+			byte[] message = clickSerializer.serialize(clickIterator.next());
+			ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, message);
+			producer.send(record);
+
+			Thread.sleep(DELAY);
+		}
+	}
+
+	private static Properties createKafkaProperties(final ParameterTool params) {
+		String brokers = params.get("bootstrap.servers", "localhost:9092");
+		Properties kafkaProps = new Properties();
+		kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+		kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		return kafkaProps;
+	}
+
+	static class ClickIterator  {
+
+		private Map<String, Long> nextTimestampPerKey;
+		private int nextPageIndex;
+
+		ClickIterator() {
+			nextTimestampPerKey = new HashMap<>();
+			nextPageIndex = 0;
+		}
+
+		ClickEvent next() {
+			String page = nextPage();
+			return new ClickEvent(nextTimestamp(page), page);
+		}
+
+		private Date nextTimestamp(String page) {
+			long nextTimestamp = nextTimestampPerKey.getOrDefault(page, 0L);
+			nextTimestampPerKey.put(page, nextTimestamp + WINDOW_SIZE.toMilliseconds() / EVENTS_PER_WINDOW);
+			return new Date(nextTimestamp);
+		}
+
+		private String nextPage() {
+			String nextPage = pages.get(nextPageIndex);
+			if (nextPageIndex == pages.size() - 1) {
+				nextPageIndex = 0;
+			} else {
+				nextPageIndex++;
+			}
+			return nextPage;
+		}
+	}
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java
new file mode 100644
index 0000000..cb6af36
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.Date;
+
+/**
+ * A simple {@link ProcessWindowFunction}, which wraps a count of {@link ClickEvent}s into an
+ * instance of {@link ClickEventStatistics}.
+ *
+ **/
+public class ClickEventStatisticsCollector
+		extends ProcessWindowFunction<Long, ClickEventStatistics, String, TimeWindow> {
+
+	@Override
+	public void process(
+			final String page,
+			final Context context,
+			final Iterable<Long> elements,
+			final Collector<ClickEventStatistics> out) throws Exception {
+
+		Long count = elements.iterator().next();
+
+		out.collect(new ClickEventStatistics(new Date(context.window().getStart()), new Date(context.window().getEnd()), page, count));
+	}
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java
new file mode 100644
index 0000000..5ebc381
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+
+/**
+ * An {@link AggregateFunction} which simply counts {@link ClickEvent}s.
+ *
+ */
+public class CountingAggregator implements AggregateFunction<ClickEvent, Long, Long> {
+	@Override
+	public Long createAccumulator() {
+		return 0L;
+	}
+
+	@Override
+	public Long add(final ClickEvent value, final Long accumulator) {
+		return accumulator + 1;
+	}
+
+	@Override
+	public Long getResult(final Long accumulator) {
+		return accumulator;
+	}
+
+	@Override
+	public Long merge(final Long a, final Long b) {
+		return a + b;
+	}
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java
new file mode 100644
index 0000000..3c87a1f
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
+
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * A simple event recording a click on a {@link ClickEvent#page} at time {@link ClickEvent#timestamp}.
+ *
+ */
+public class ClickEvent {
+
+	//using java.util.Date for better readability
+	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+	private Date timestamp;
+	private String page;
+
+	public ClickEvent() {
+	}
+
+	public ClickEvent(final Date timestamp, final String page) {
+		this.timestamp = timestamp;
+		this.page = page;
+	}
+
+	public Date getTimestamp() {
+		return timestamp;
+	}
+
+	public void setTimestamp(final Date timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	public String getPage() {
+		return page;
+	}
+
+	public void setPage(final String page) {
+		this.page = page;
+	}
+
+	@Override
+	public boolean equals(final Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final ClickEvent that = (ClickEvent) o;
+		return Objects.equals(timestamp, that.timestamp) && Objects.equals(page, that.page);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(timestamp, page);
+	}
+
+	@Override
+	public String toString() {
+		final StringBuilder sb = new StringBuilder("ClickEvent{");
+		sb.append("timestamp=").append(timestamp);
+		sb.append(", page='").append(page).append('\'');
+		sb.append('}');
+		return sb.toString();
+	}
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java
new file mode 100644
index 0000000..bb47df5
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
+ *
+ */
+public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final ObjectMapper objectMapper = new ObjectMapper();
+
+	@Override
+	public ClickEvent deserialize(byte[] message) throws IOException {
+		return objectMapper.readValue(message, ClickEvent.class);
+	}
+
+	@Override
+	public boolean isEndOfStream(ClickEvent nextElement) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<ClickEvent> getProducedType() {
+		return TypeInformation.of(ClickEvent.class);
+	}
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
new file mode 100644
index 0000000..fab05d1
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A Kafka {@link SerializationSchema} to serialize {@link ClickEvent}s as JSON.
+ *
+ */
+public class ClickEventSerializationSchema implements SerializationSchema<ClickEvent> {
+
+	private static final ObjectMapper objectMapper = new ObjectMapper();
+
+	public ClickEventSerializationSchema() {
+		super();
+	}
+
+	@Override
+	public byte[] serialize(ClickEvent message) {
+		try {
+			//if topic is null, default topic will be used
+			return objectMapper.writeValueAsBytes(message);
+		} catch (JsonProcessingException e) {
+			throw new IllegalArgumentException("Could not serialize record: " + message, e);
+		}
+	}
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java
new file mode 100644
index 0000000..de2d754
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
+
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * A small wrapper class for windowed page counts.
+ *
+ */
+public class ClickEventStatistics {
+
+	//using java.util.Date for better readability
+	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+	private Date windowStart;
+	//using java.util.Date for better readability
+	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+	private Date windowEnd;
+	private String page;
+	private long count;
+
+	public ClickEventStatistics() {
+	}
+
+	public ClickEventStatistics(
+			final Date windowStart,
+			final Date windowEnd,
+			final String page,
+			final long count) {
+		this.windowStart = windowStart;
+		this.windowEnd = windowEnd;
+		this.page = page;
+		this.count = count;
+	}
+
+	public Date getWindowStart() {
+		return windowStart;
+	}
+
+	public void setWindowStart(final Date windowStart) {
+		this.windowStart = windowStart;
+	}
+
+	public Date getWindowEnd() {
+		return windowEnd;
+	}
+
+	public void setWindowEnd(final Date windowEnd) {
+		this.windowEnd = windowEnd;
+	}
+
+	public String getPage() {
+		return page;
+	}
+
+	public void setPage(final String page) {
+		this.page = page;
+	}
+
+	public long getCount() {
+		return count;
+	}
+
+	public void setCount(final long count) {
+		this.count = count;
+	}
+
+	@Override
+	public boolean equals(final Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final ClickEventStatistics that = (ClickEventStatistics) o;
+		return count == that.count &&
+				Objects.equals(windowStart, that.windowStart) &&
+				Objects.equals(windowEnd, that.windowEnd) &&
+				Objects.equals(page, that.page);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(windowStart, windowEnd, page, count);
+	}
+
+	@Override
+	public String toString() {
+		final StringBuilder sb = new StringBuilder("ClickEventStatistics{");
+		sb.append("windowStart=").append(windowStart);
+		sb.append(", windowEnd=").append(windowEnd);
+		sb.append(", page='").append(page).append('\'');
+		sb.append(", count=").append(count);
+		sb.append('}');
+		return sb.toString();
+	}
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
new file mode 100644
index 0000000..40a0dbd
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
+ *
+ */
+public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> {
+
+	private static final ObjectMapper objectMapper = new ObjectMapper();
+
+	@Override
+	public byte[] serialize(ClickEventStatistics message) {
+		try {
+			//if topic is null, default topic will be used
+			return objectMapper.writeValueAsBytes(message);
+		} catch (JsonProcessingException e) {
+			throw new IllegalArgumentException("Could not serialize record: " + message, e);
+		}
+	}
+}
diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties
similarity index 75%
copy from flink-cluster-playground/conf/flink-conf.yaml
copy to docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties
index 5c8d0e6..da32ea0 100644
--- a/flink-cluster-playground/conf/flink-conf.yaml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties
@@ -16,15 +16,8 @@
 # limitations under the License.
 ################################################################################
 
-jobmanager.rpc.address: jobmanager
-blob.server.port: 6124
-query.server.port: 6125
+log4j.rootLogger=INFO, console
 
-taskmanager.numberOfTaskSlots: 2
-
-state.backend: filesystem
-state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
-state.savepoints.dir: file:///tmp/flink-savepoints-directory
-
-heartbeat.interval: 1000
-heartbeat.timeout: 5000
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/howto-update-playgrounds.md b/howto-update-playgrounds.md
new file mode 100644
index 0000000..2885b62
--- /dev/null
+++ b/howto-update-playgrounds.md
@@ -0,0 +1,42 @@
+
+# Versioning 
+
+When updating the playgrounds we have to deal with three versions that need to be adjusted.
+
+Externally defined versions:
+
+* *Flink version*: Version of Apache Flink
+* *Flink Docker image version*: Version of the official Flink Docker image on [Docker Hub](https://hub.docker.com/_/flink)
+
+Internally defined version:
+
+* *Playground Version*: This version is used for the Java artifacts and Docker images. It follows the scheme of `<Playground-Version>-Flink-<Minor-Flink-Version>`. For example `2-Flink-1.9` denotes Version 2 of the playground for Flink 1.9.x.
+
+# Updating the playgrounds
+
+## Update playgrounds due to a new minor (or major) Flink release
+
+First of all, check that a Flink Docker image was published on [Docker Hub](https://hub.docker.com/_/flink) for the new Flink version.
+
+Update all playgrounds as follows:
+
+1. All `pom.xml`: 
+  * Update the versions of all Flink dependencies 
+  * Update the Maven artifact version to the new playground version (`1-Flink-1.9` if 1.9.0 is the new Flink release).
+  * Check that all Maven projects build.
+2. All `Dockerfile`: 
+	* Update version of the base image to the new Flink Docker image version.
+3. `docker-compose.yaml`: 
+	* Update the version of the Flink containers to the new Flink docker image version.
+	* Update the version of the custom Docker images to the new playground version.
+
+## Update a playground due to bug in a custom Docker image
+
+Whenever, you need to update a Docker image, please increment the playground version in the following places.
+
+* the artifact version of all Maven projects in all `pom.xml` files.
+* the tag (name and version) of all custom images in all `docker-compose.yaml` files.
+
+## Update a playground without updating a custom Docker image
+
+Just update the `docker-compose.yaml` file.
diff --git a/operations-playground/README.md b/operations-playground/README.md
new file mode 100644
index 0000000..5287be7
--- /dev/null
+++ b/operations-playground/README.md
@@ -0,0 +1,50 @@
+# Flink Operations Playground
+
+The Flink operations playground let's you explore and play with [Apache Flink](https://flink.apache.org)'s features to manage and operate stream processing jobs, including
+
+* Observing automatic failure recovery of an application
+* Upgrading and rescaling an application
+* Querying the runtime metrics of an application
+
+It is based on a [docker-compose](https://docs.docker.com/compose/) environment and super easy to setup.
+
+## Setup
+
+The operations playground requires a custom Docker image in addition to public images for Flink, Kafka, and ZooKeeper. 
+
+The `docker-compose.yaml` file of the operations playground is located in the `operations-playground` directory. Assuming you are at the root directory of the [`flink-playgrounds`](https://github.com/apache/flink-playgrounds) repository, change to the `operations-playground` folder by running
+
+```bash
+cd operations-playground
+```
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started, if you can access the WebUI of the Flink cluster at [http://localhost:8081](http://localhost:8081).
+
+### Stopping the Playground
+
+To stop the playground, run the following command
+
+```bash
+docker-compose down
+```
+
+## Further instructions
+
+The playground setup and more detailed instructions are presented in the
+["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/operations-playground/conf/flink-conf.yaml
similarity index 100%
rename from flink-cluster-playground/conf/flink-conf.yaml
rename to operations-playground/conf/flink-conf.yaml
diff --git a/flink-cluster-playground/conf/log4j-cli.properties b/operations-playground/conf/log4j-cli.properties
similarity index 100%
rename from flink-cluster-playground/conf/log4j-cli.properties
rename to operations-playground/conf/log4j-cli.properties
diff --git a/flink-cluster-playground/conf/log4j-console.properties b/operations-playground/conf/log4j-console.properties
similarity index 100%
rename from flink-cluster-playground/conf/log4j-console.properties
rename to operations-playground/conf/log4j-console.properties
diff --git a/flink-cluster-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
similarity index 81%
rename from flink-cluster-playground/docker-compose.yaml
rename to operations-playground/docker-compose.yaml
index 7842762..d498070 100644
--- a/flink-cluster-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -19,8 +19,9 @@
 version: "2.1"
 services:
   client:
-    image: flink:1.9-scala_2.11
-    command: "flink run -d -p 2 /opt/flink/examples/streaming/ClickEventCount.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
+    build: ../docker/ops-playground-image
+    image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+    command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
     depends_on:
       - jobmanager
       - kafka
@@ -29,12 +30,12 @@ services:
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   clickevent-generator:
-    image: flink:1.9-scala_2.11
-    command: "java -classpath /opt/flink/examples/streaming/ClickEventCount.jar:/opt/flink/lib/* org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
+    image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+    command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
     depends_on:
       - kafka
   jobmanager:
-    image: flink:1.9-scala_2.11
+    image: flink:1.8-scala_2.11
     command: "jobmanager.sh start-foreground"
     ports:
       - 8081:8081
@@ -45,7 +46,7 @@ services:
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   taskmanager:
-    image: flink:1.9-scala_2.11
+    image: flink:1.8-scala_2.11
     depends_on:
       - jobmanager
     command: "taskmanager.sh start-foreground"