You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/08/19 16:10:03 UTC
[flink-playgrounds] 02/02: [FLINK-12749] Add Dockerfile for
Operations Playground image
This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git
commit 626ca07bf71f0b1340df25677ae1ec0969719a9c
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Mon Aug 12 18:48:23 2019 +0200
[FLINK-12749] Add Dockerfile for Operations Playground image
* Rename Cluster Playground to Operations Playground
* Add Java code for Click Count demo job
* Add Dockerfile to build the custom image for the operations playground
* Add README files
---
.gitignore | 3 +
README.md | 21 +-
.../ops-playground-image/Dockerfile | 32 ++-
docker/ops-playground-image/README.md | 5 +
.../java/flink-playground-clickcountjob/LICENSE | 201 ++++++++++++++++++
.../java/flink-playground-clickcountjob/pom.xml | 230 +++++++++++++++++++++
.../ops/clickcount/ClickEventCount.java | 116 +++++++++++
.../ops/clickcount/ClickEventGenerator.java | 122 +++++++++++
.../functions/ClickEventStatisticsCollector.java | 47 +++++
.../clickcount/functions/CountingAggregator.java | 47 +++++
.../ops/clickcount/records/ClickEvent.java | 85 ++++++++
.../records/ClickEventDeserializationSchema.java | 51 +++++
.../records/ClickEventSerializationSchema.java | 46 +++++
.../clickcount/records/ClickEventStatistics.java | 116 +++++++++++
.../ClickEventStatisticsSerializationSchema.java | 42 ++++
.../src/main/resources/log4j.properties | 15 +-
howto-update-playgrounds.md | 42 ++++
operations-playground/README.md | 50 +++++
.../conf/flink-conf.yaml | 0
.../conf/log4j-cli.properties | 0
.../conf/log4j-console.properties | 0
.../docker-compose.yaml | 13 +-
22 files changed, 1247 insertions(+), 37 deletions(-)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..d4e4d76
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+*/.idea
+*/target
+*/dependency-reduced-pom.xml
diff --git a/README.md b/README.md
index 8fe2903..c9881a3 100644
--- a/README.md
+++ b/README.md
@@ -1,20 +1,23 @@
# Apache Flink Playgrounds
-Apache Flink is an open source stream processing framework with powerful stream- and batch-
-processing capabilities.
+This repository provides playgrounds to quickly and easily explore [Apache Flink](https://flink.apache.org)'s features.
-Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/)
+The playgrounds are based on [docker-compose](https://docs.docker.com/compose/) environments.
+Each subfolder of this repository contains the docker-compose setup of a playground, except for the `./docker` folder which contains code and configuration to build custom Docker images for the playgrounds.
-## Playgrounds
+## Available Playgrounds
-This repository contains the configuration files for two Apache Flink playgrounds.
+Currently, the following playgrounds are available:
-* The [Flink Cluster Playground](../master/flink-cluster-playground) consists of a Flink Session Cluster, a Kafka Cluster and a simple
-Flink Job. It is explained in detail as part of
-[Apache Flink's "Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-cluster-playground.html).
+* The **Flink Operations Playground** in the (`operations-playground` folder) let's you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example
+Flink job. The playground is presented in detail in the
+["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
* The interactive SQL playground is still under development and will be added shortly.
## About
-Apache Flink is an open source project of The Apache Software Foundation (ASF).
\ No newline at end of file
+Apache Flink is an open source project of The Apache Software Foundation (ASF).
+
+Flink is distributed data processing framework with powerful stream and batch processing capabilities.
+Learn more about Flink at [http://flink.apache.org/](https://flink.apache.org/)
diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/docker/ops-playground-image/Dockerfile
similarity index 52%
copy from flink-cluster-playground/conf/flink-conf.yaml
copy to docker/ops-playground-image/Dockerfile
index 5c8d0e6..8b64428 100644
--- a/flink-cluster-playground/conf/flink-conf.yaml
+++ b/docker/ops-playground-image/Dockerfile
@@ -1,4 +1,4 @@
-################################################################################
+###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,17 +14,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-################################################################################
+###############################################################################
-jobmanager.rpc.address: jobmanager
-blob.server.port: 6124
-query.server.port: 6125
+###############################################################################
+# Build Click Count Job
+###############################################################################
-taskmanager.numberOfTaskSlots: 2
+FROM maven:3.6-jdk-8-slim AS builder
-state.backend: filesystem
-state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
-state.savepoints.dir: file:///tmp/flink-savepoints-directory
+# Get Click Count job and compile it
+COPY ./java/flink-playground-clickcountjob /opt/flink-playground-clickcountjob
+WORKDIR /opt/flink-playground-clickcountjob
+RUN mvn clean install
-heartbeat.interval: 1000
-heartbeat.timeout: 5000
+
+###############################################################################
+# Build Operations Playground Image
+###############################################################################
+
+FROM flink:1.8.1-scala_2.11
+
+WORKDIR /opt/flink/bin
+
+# Copy Click Count Job
+COPY --from=builder /opt/flink-playground-clickcountjob/target/flink-playground-clickcountjob-*.jar /opt/ClickCountJob.jar
diff --git a/docker/ops-playground-image/README.md b/docker/ops-playground-image/README.md
new file mode 100644
index 0000000..a22382d
--- /dev/null
+++ b/docker/ops-playground-image/README.md
@@ -0,0 +1,5 @@
+# Flink Operations Playground Image
+
+The image defined by the `Dockerfile` in this folder is required by the Flink operations playground.
+
+The image is based on the official Flink image and adds a demo Flink job (Click Event Count) and a corresponding data generator. The code of the application is located in the `./java/flink-ops-playground` folder.
\ No newline at end of file
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE b/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
new file mode 100644
index 0000000..f1f9b89
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -0,0 +1,230 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-playground-clickcountjob</artifactId>
+ <version>1-FLINK-1.8_2.11</version>
+
+ <name>flink-playground-clickcountjob</name>
+ <packaging>jar</packaging>
+ <url>https://flink.apache.org</url>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <scm>
+ <url>https://github.com/apache/flink-playgrounds</url>
+ <connection>git@github.com:apache/flink-playgrounds.git</connection>
+ <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/flink-playgrounds.git</developerConnection>
+ </scm>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>1.8.1</flink.version>
+ <java.version>1.8</java.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <maven.compiler.source>${java.version}</maven.compiler.source>
+ <maven.compiler.target>${java.version}</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <!-- Apache Flink dependencies -->
+ <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Connector dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- Logging dependencies -->
+ <!-- These dependencies are excluded from the application JAR by default. -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.7</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Java Compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+ <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.playgrounds.ops.clickcount.ClickEventCount</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+
+ <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <versionRange>[3.0.0,)</versionRange>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.1,)</versionRange>
+ <goals>
+ <goal>testCompile</goal>
+ <goal>compile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <!-- This profile helps to make things run out of the box in IntelliJ -->
+ <!-- Its adds Flink's core classes to the runtime class path. -->
+ <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+ <profiles>
+ <profile>
+ <id>add-dependencies-for-IDEA</id>
+
+ <activation>
+ <property>
+ <name>idea.version</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+
+</project>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
new file mode 100644
index 0000000..9f609e9
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
+import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializationSchema;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A simple streaming job reading {@link ClickEvent}s from Kafka, counting events per 15 seconds and
+ * writing the resulting {@link ClickEventStatistics} back to Kafka.
+ *
+ * <p> It can be run with or without checkpointing and with event time or processing time semantics.
+ * </p>
+ *
+ * <p>The Job can be configured via the command line:</p>
+ * * "--checkpointing": enables checkpointing
+ * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
+ * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
+ * * "--bootstrap.servers": comma-separated list of Kafka brokers
+ *
+ */
+public class ClickEventCount {
+
+ public static final String CHECKPOINTING_OPTION = "checkpointing";
+ public static final String EVENT_TIME_OPTION = "event-time";
+
+ public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool params = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ configureEnvironment(params, env);
+
+ String inputTopic = params.get("input-topic", "input");
+ String outputTopic = params.get("output-topic", "output");
+ String brokers = params.get("bootstrap.servers", "localhost:9092");
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
+
+ env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
+ .name("ClickEvent Source")
+ .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
+ @Override
+ public long extractTimestamp(final ClickEvent element) {
+ return element.getTimestamp().getTime();
+ }
+ })
+ .keyBy(ClickEvent::getPage)
+ .timeWindow(WINDOW_SIZE)
+ .aggregate(new CountingAggregator(),
+ new ClickEventStatisticsCollector())
+ .name("ClickEvent Counter")
+ .addSink(new FlinkKafkaProducer<ClickEventStatistics>(
+ outputTopic,
+ new ClickEventStatisticsSerializationSchema(),
+ kafkaProps))
+ .name("ClickEventStatistics Sink");
+
+ env.execute("Click Event Count");
+ }
+
+ private static void configureEnvironment(
+ final ParameterTool params,
+ final StreamExecutionEnvironment env) {
+
+ boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION);
+ boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION);
+
+ if (checkpointingEnabled) {
+ env.enableCheckpointing(1000);
+ }
+
+ if (eventTimeSemantics) {
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ }
+
+ //disabling Operator chaining to make it easier to follow the Job in the WebUI
+ env.disableOperatorChaining();
+ }
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
new file mode 100644
index 0000000..6a5c394
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventSerializationSchema;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.playgrounds.ops.clickcount.ClickEventCount.WINDOW_SIZE;
+
+/**
+ * A generator which pushes {@link ClickEvent}s into a Kafka Topic configured via `--topic` and
+ * `--bootstrap.servers`.
+ *
+ * <p> The generator creates the same number of {@link ClickEvent}s for all pages. The delay between
+ * events is chosen such that processing time and event time roughly align. The generator always
+ * creates the same sequence of events. </p>
+ *
+ */
+public class ClickEventGenerator {
+
+ public static final int EVENTS_PER_WINDOW = 1000;
+
+ private static final List<String> pages = Arrays.asList("/help", "/index", "/shop", "/jobs", "/about", "/news");
+
+ //this calculation is only accurate as long as pages.size() * EVENTS_PER_WINDOW divides the
+ //window size
+ public static final long DELAY = WINDOW_SIZE.toMilliseconds() / pages.size() / EVENTS_PER_WINDOW;
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool params = ParameterTool.fromArgs(args);
+
+ String topic = params.get("topic", "input");
+
+ Properties kafkaProps = createKafkaProperties(params);
+
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps);
+
+ ClickIterator clickIterator = new ClickIterator();
+ SerializationSchema<ClickEvent> clickSerializer = new ClickEventSerializationSchema();
+
+ while (true) {
+
+ byte[] message = clickSerializer.serialize(clickIterator.next());
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, message);
+ producer.send(record);
+
+ Thread.sleep(DELAY);
+ }
+ }
+
+ private static Properties createKafkaProperties(final ParameterTool params) {
+ String brokers = params.get("bootstrap.servers", "localhost:9092");
+ Properties kafkaProps = new Properties();
+ kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+ kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+ return kafkaProps;
+ }
+
+ static class ClickIterator {
+
+ private Map<String, Long> nextTimestampPerKey;
+ private int nextPageIndex;
+
+ ClickIterator() {
+ nextTimestampPerKey = new HashMap<>();
+ nextPageIndex = 0;
+ }
+
+ ClickEvent next() {
+ String page = nextPage();
+ return new ClickEvent(nextTimestamp(page), page);
+ }
+
+ private Date nextTimestamp(String page) {
+ long nextTimestamp = nextTimestampPerKey.getOrDefault(page, 0L);
+ nextTimestampPerKey.put(page, nextTimestamp + WINDOW_SIZE.toMilliseconds() / EVENTS_PER_WINDOW);
+ return new Date(nextTimestamp);
+ }
+
+ private String nextPage() {
+ String nextPage = pages.get(nextPageIndex);
+ if (nextPageIndex == pages.size() - 1) {
+ nextPageIndex = 0;
+ } else {
+ nextPageIndex++;
+ }
+ return nextPage;
+ }
+ }
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java
new file mode 100644
index 0000000..cb6af36
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.Date;
+
+/**
+ * A simple {@link ProcessWindowFunction}, which wraps a count of {@link ClickEvent}s into an
+ * instance of {@link ClickEventStatistics}.
+ *
+ **/
+public class ClickEventStatisticsCollector
+ extends ProcessWindowFunction<Long, ClickEventStatistics, String, TimeWindow> {
+
+ @Override
+ public void process(
+ final String page,
+ final Context context,
+ final Iterable<Long> elements,
+ final Collector<ClickEventStatistics> out) throws Exception {
+
+ Long count = elements.iterator().next();
+
+ out.collect(new ClickEventStatistics(new Date(context.window().getStart()), new Date(context.window().getEnd()), page, count));
+ }
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java
new file mode 100644
index 0000000..5ebc381
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+
+/**
+ * An {@link AggregateFunction} which simply counts {@link ClickEvent}s.
+ *
+ */
+public class CountingAggregator implements AggregateFunction<ClickEvent, Long, Long> {
+ @Override
+ public Long createAccumulator() {
+ return 0L;
+ }
+
+ @Override
+ public Long add(final ClickEvent value, final Long accumulator) {
+ return accumulator + 1;
+ }
+
+ @Override
+ public Long getResult(final Long accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Long merge(final Long a, final Long b) {
+ return a + b;
+ }
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java
new file mode 100644
index 0000000..3c87a1f
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
+
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * A simple event recording a click on a {@link ClickEvent#page} at time {@link ClickEvent#timestamp}.
+ *
+ */
+public class ClickEvent {
+
+ //using java.util.Date for better readability
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+ private Date timestamp;
+ private String page;
+
+ public ClickEvent() {
+ }
+
+ public ClickEvent(final Date timestamp, final String page) {
+ this.timestamp = timestamp;
+ this.page = page;
+ }
+
+ public Date getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(final Date timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getPage() {
+ return page;
+ }
+
+ public void setPage(final String page) {
+ this.page = page;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ClickEvent that = (ClickEvent) o;
+ return Objects.equals(timestamp, that.timestamp) && Objects.equals(page, that.page);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestamp, page);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ClickEvent{");
+ sb.append("timestamp=").append(timestamp);
+ sb.append(", page='").append(page).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java
new file mode 100644
index 0000000..bb47df5
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
+ *
+ */
+public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public ClickEvent deserialize(byte[] message) throws IOException {
+ return objectMapper.readValue(message, ClickEvent.class);
+ }
+
+ @Override
+ public boolean isEndOfStream(ClickEvent nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<ClickEvent> getProducedType() {
+ return TypeInformation.of(ClickEvent.class);
+ }
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
new file mode 100644
index 0000000..fab05d1
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A Kafka {@link SerializationSchema} to serialize {@link ClickEvent}s as JSON.
+ *
+ */
+public class ClickEventSerializationSchema implements SerializationSchema<ClickEvent> {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public ClickEventSerializationSchema() {
+ super();
+ }
+
+ @Override
+ public byte[] serialize(ClickEvent message) {
+ try {
+ //if topic is null, default topic will be used
+ return objectMapper.writeValueAsBytes(message);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("Could not serialize record: " + message, e);
+ }
+ }
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java
new file mode 100644
index 0000000..de2d754
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
+
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * A small wrapper class for windowed page counts.
+ *
+ */
+public class ClickEventStatistics {
+
+ //using java.util.Date for better readability
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+ private Date windowStart;
+ //using java.util.Date for better readability
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
+ private Date windowEnd;
+ private String page;
+ private long count;
+
+ public ClickEventStatistics() {
+ }
+
+ public ClickEventStatistics(
+ final Date windowStart,
+ final Date windowEnd,
+ final String page,
+ final long count) {
+ this.windowStart = windowStart;
+ this.windowEnd = windowEnd;
+ this.page = page;
+ this.count = count;
+ }
+
+ public Date getWindowStart() {
+ return windowStart;
+ }
+
+ public void setWindowStart(final Date windowStart) {
+ this.windowStart = windowStart;
+ }
+
+ public Date getWindowEnd() {
+ return windowEnd;
+ }
+
+ public void setWindowEnd(final Date windowEnd) {
+ this.windowEnd = windowEnd;
+ }
+
+ public String getPage() {
+ return page;
+ }
+
+ public void setPage(final String page) {
+ this.page = page;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void setCount(final long count) {
+ this.count = count;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ClickEventStatistics that = (ClickEventStatistics) o;
+ return count == that.count &&
+ Objects.equals(windowStart, that.windowStart) &&
+ Objects.equals(windowEnd, that.windowEnd) &&
+ Objects.equals(page, that.page);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(windowStart, windowEnd, page, count);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ClickEventStatistics{");
+ sb.append("windowStart=").append(windowStart);
+ sb.append(", windowEnd=").append(windowEnd);
+ sb.append(", page='").append(page).append('\'');
+ sb.append(", count=").append(count);
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
new file mode 100644
index 0000000..40a0dbd
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.records;
+
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
+ *
+ */
+public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public byte[] serialize(ClickEventStatistics message) {
+ try {
+ //if topic is null, default topic will be used
+ return objectMapper.writeValueAsBytes(message);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("Could not serialize record: " + message, e);
+ }
+ }
+}
diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties
similarity index 75%
copy from flink-cluster-playground/conf/flink-conf.yaml
copy to docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties
index 5c8d0e6..da32ea0 100644
--- a/flink-cluster-playground/conf/flink-conf.yaml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties
@@ -16,15 +16,8 @@
# limitations under the License.
################################################################################
-jobmanager.rpc.address: jobmanager
-blob.server.port: 6124
-query.server.port: 6125
+log4j.rootLogger=INFO, console
-taskmanager.numberOfTaskSlots: 2
-
-state.backend: filesystem
-state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
-state.savepoints.dir: file:///tmp/flink-savepoints-directory
-
-heartbeat.interval: 1000
-heartbeat.timeout: 5000
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/howto-update-playgrounds.md b/howto-update-playgrounds.md
new file mode 100644
index 0000000..2885b62
--- /dev/null
+++ b/howto-update-playgrounds.md
@@ -0,0 +1,42 @@
+
+# Versioning
+
+When updating the playgrounds we have to deal with three versions that need to be adjusted.
+
+Externally defined versions:
+
+* *Flink version*: Version of Apache Flink
+* *Flink Docker image version*: Version of the official Flink Docker image on [Docker Hub](https://hub.docker.com/_/flink)
+
+Internally defined version:
+
+* *Playground Version*: This version is used for the Java artifacts and Docker images. It follows the scheme of `<Playground-Version>-Flink-<Minor-Flink-Version>`. For example `2-Flink-1.9` denotes Version 2 of the playground for Flink 1.9.x.
+
+# Updating the playgrounds
+
+## Update playgrounds due to a new minor (or major) Flink release
+
+First of all, check that a Flink Docker image was published on [Docker Hub](https://hub.docker.com/_/flink) for the new Flink version.
+
+Update all playgrounds as follows:
+
+1. All `pom.xml`:
+ * Update the versions of all Flink dependencies
+ * Update the Maven artifact version to the new playground version (`1-Flink-1.9` if 1.9.0 is the new Flink release).
+ * Check that all Maven projects build.
+2. All `Dockerfile`:
+ * Update version of the base image to the new Flink Docker image version.
+3. `docker-compose.yaml`:
+ * Update the version of the Flink containers to the new Flink docker image version.
+ * Update the version of the custom Docker images to the new playground version.
+
+## Update a playground due to bug in a custom Docker image
+
+Whenever, you need to update a Docker image, please increment the playground version in the following places.
+
+* the artifact version of all Maven projects in all `pom.xml` files.
+* the tag (name and version) of all custom images in all `docker-compose.yaml` files.
+
+## Update a playground without updating a custom Docker image
+
+Just update the `docker-compose.yaml` file.
diff --git a/operations-playground/README.md b/operations-playground/README.md
new file mode 100644
index 0000000..5287be7
--- /dev/null
+++ b/operations-playground/README.md
@@ -0,0 +1,50 @@
+# Flink Operations Playground
+
+The Flink operations playground let's you explore and play with [Apache Flink](https://flink.apache.org)'s features to manage and operate stream processing jobs, including
+
+* Observing automatic failure recovery of an application
+* Upgrading and rescaling an application
+* Querying the runtime metrics of an application
+
+It is based on a [docker-compose](https://docs.docker.com/compose/) environment and super easy to setup.
+
+## Setup
+
+The operations playground requires a custom Docker image in addition to public images for Flink, Kafka, and ZooKeeper.
+
+The `docker-compose.yaml` file of the operations playground is located in the `operations-playground` directory. Assuming you are at the root directory of the [`flink-playgrounds`](https://github.com/apache/flink-playgrounds) repository, change to the `operations-playground` folder by running
+
+```bash
+cd operations-playground
+```
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started, if you can access the WebUI of the Flink cluster at [http://localhost:8081](http://localhost:8081).
+
+### Stopping the Playground
+
+To stop the playground, run the following command
+
+```bash
+docker-compose down
+```
+
+## Further instructions
+
+The playground setup and more detailed instructions are presented in the
+["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/operations-playground/conf/flink-conf.yaml
similarity index 100%
rename from flink-cluster-playground/conf/flink-conf.yaml
rename to operations-playground/conf/flink-conf.yaml
diff --git a/flink-cluster-playground/conf/log4j-cli.properties b/operations-playground/conf/log4j-cli.properties
similarity index 100%
rename from flink-cluster-playground/conf/log4j-cli.properties
rename to operations-playground/conf/log4j-cli.properties
diff --git a/flink-cluster-playground/conf/log4j-console.properties b/operations-playground/conf/log4j-console.properties
similarity index 100%
rename from flink-cluster-playground/conf/log4j-console.properties
rename to operations-playground/conf/log4j-console.properties
diff --git a/flink-cluster-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
similarity index 81%
rename from flink-cluster-playground/docker-compose.yaml
rename to operations-playground/docker-compose.yaml
index 7842762..d498070 100644
--- a/flink-cluster-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -19,8 +19,9 @@
version: "2.1"
services:
client:
- image: flink:1.9-scala_2.11
- command: "flink run -d -p 2 /opt/flink/examples/streaming/ClickEventCount.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
+ build: ../docker/ops-playground-image
+ image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+ command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
depends_on:
- jobmanager
- kafka
@@ -29,12 +30,12 @@ services:
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
clickevent-generator:
- image: flink:1.9-scala_2.11
- command: "java -classpath /opt/flink/examples/streaming/ClickEventCount.jar:/opt/flink/lib/* org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
+ image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+ command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
depends_on:
- kafka
jobmanager:
- image: flink:1.9-scala_2.11
+ image: flink:1.8-scala_2.11
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
@@ -45,7 +46,7 @@ services:
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
- image: flink:1.9-scala_2.11
+ image: flink:1.8-scala_2.11
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"