You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/06/23 19:31:33 UTC

[flink-playgrounds] branch master updated (a27301e -> 19e8305)

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git.


    from a27301e  [FLINK-16540] Fully specify bugfix version of Flink images in docker-compose.yaml
     add 43ec2eb  [hotfix] Update gitignore
     add 724dfa0  [FLINK-18194][walkthroughs] Add transactions data generator
     add fb08741  [FLINK-18194][walkthroughs] Add spend report mysql configurations
     add bc804b9  [FLINK-18194][walkthroughs] Add spend report grafana dashbaord
     new 19e8305  [FLINK-18194][walkthroughs] Add table api walkthrough skeleton code

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |   2 +
 docker/data-generator/Dockerfile                   |  33 +++
 docker/data-generator/docker-entrypoint.sh         |  18 ++
 docker/data-generator/pom.xml                      | 164 ++++++++++++++
 .../flink/playground/datagen/DataGenerator.java}   |  40 ++--
 .../apache/flink/playground/datagen/Producer.java  |  93 ++++++++
 .../apache/flink/playground/datagen/Throttler.java |  72 ++++++
 .../playground/datagen/model/Transaction.java}     |  30 +--
 .../datagen/model/TransactionSerializer.java}      |  40 ++--
 .../datagen/model/TransactionSupplier.java         |  53 +++++
 docker/grafana-spend-report-init/dashboard.json    | 127 +++++++++++
 docker/grafana-spend-report-init/grafana.ini       |  22 ++
 .../provisioning/dashboards/dashboards.yml}        |  15 +-
 .../provisioning/datasources/datasource.yml}       |  20 +-
 docker/mysql-spend-report-init/create-table.sql    |  22 ++
 table-walkthrough/Dockerfile                       |  45 ++++
 table-walkthrough/docker-compose.yml               |  91 ++++++++
 table-walkthrough/pom.xml                          | 251 +++++++++++++++++++++
 .../flink/playgrounds/spendreport/SpendReport.java |  68 ++++++
 .../spendreport/UnimplementedException.java        |  30 +--
 .../src/main/resources/log4j.properties            |   2 +-
 .../playgrounds/spendreport/SpendReportTest.java   |  96 ++++++++
 .../src/test}/resources/log4j.properties           |   4 +-
 23 files changed, 1240 insertions(+), 98 deletions(-)
 create mode 100644 docker/data-generator/Dockerfile
 create mode 100755 docker/data-generator/docker-entrypoint.sh
 create mode 100644 docker/data-generator/pom.xml
 copy docker/{ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java => data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java} (51%)
 create mode 100644 docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
 create mode 100644 docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
 copy docker/{ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java => data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java} (51%)
 copy docker/{ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java => data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java} (50%)
 create mode 100644 docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java
 create mode 100644 docker/grafana-spend-report-init/dashboard.json
 create mode 100644 docker/grafana-spend-report-init/grafana.ini
 copy docker/{ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties => grafana-spend-report-init/provisioning/dashboards/dashboards.yml} (80%)
 copy docker/{ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties => grafana-spend-report-init/provisioning/datasources/datasource.yml} (77%)
 create mode 100644 docker/mysql-spend-report-init/create-table.sql
 create mode 100644 table-walkthrough/Dockerfile
 create mode 100644 table-walkthrough/docker-compose.yml
 create mode 100644 table-walkthrough/pom.xml
 create mode 100644 table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java
 copy docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java => table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java (51%)
 copy {docker/ops-playground-image/java/flink-playground-clickcountjob => table-walkthrough}/src/main/resources/log4j.properties (97%)
 create mode 100644 table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
 copy {docker/ops-playground-image/java/flink-playground-clickcountjob/src/main => table-walkthrough/src/test}/resources/log4j.properties (85%)


[flink-playgrounds] 01/01: [FLINK-18194][walkthroughs] Add table api walkthrough skeleton code

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 19e83056dbfd68899362fc37250de810cf190cc1
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Wed Jun 10 16:19:16 2020 -0500

    [FLINK-18194][walkthroughs] Add table api walkthrough skeleton code
    
    This closes #13
---
 .../flink/playground/datagen/DataGenerator.java    |   1 +
 .../apache/flink/playground/datagen/Producer.java  |  40 +---
 .../apache/flink/playground/datagen/Throttler.java |   1 +
 .../{DataGenerator.java => model/Transaction.java} |  29 +--
 .../TransactionSerializer.java}                    |  39 ++--
 .../datagen/model/TransactionSupplier.java         |  53 +++++
 table-walkthrough/Dockerfile                       |  45 ++++
 table-walkthrough/docker-compose.yml               |  91 ++++++++
 table-walkthrough/pom.xml                          | 251 +++++++++++++++++++++
 .../flink/playgrounds/spendreport/SpendReport.java |  68 ++++++
 .../spendreport/UnimplementedException.java        |  29 +--
 .../src/main/resources/log4j.properties            |  23 ++
 .../playgrounds/spendreport/SpendReportTest.java   |  96 ++++++++
 .../src/test/resources/log4j.properties            |  25 ++
 14 files changed, 697 insertions(+), 94 deletions(-)

diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
index e10c576..250f2c5 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
@@ -21,6 +21,7 @@ package org.apache.flink.playground.datagen;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/** A basic data generator for continuously writing data into a Kafka topic. */
 public class DataGenerator {
 
   private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
index e0fa817..33d68bd 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.playground.datagen;
 
-import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
-import java.util.Iterator;
 import java.util.Properties;
-import java.util.Random;
-import java.util.function.UnaryOperator;
-import java.util.stream.Stream;
+import org.apache.flink.playground.datagen.model.Transaction;
+import org.apache.flink.playground.datagen.model.TransactionSerializer;
+import org.apache.flink.playground.datagen.model.TransactionSupplier;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 
+/** Generates CSV transaction records at a rate */
 public class Producer implements Runnable, AutoCloseable {
 
   private static final DateTimeFormatter formatter =
@@ -51,36 +49,20 @@ public class Producer implements Runnable, AutoCloseable {
 
   @Override
   public void run() {
-    KafkaProducer<Long, String> producer = new KafkaProducer<>(getProperties());
+    KafkaProducer<Long, Transaction> producer = new KafkaProducer<>(getProperties());
 
     Throttler throttler = new Throttler(100);
 
-    Random generator = new Random();
-
-    Iterator<Long> accounts =
-        Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
-            .flatMap(UnaryOperator.identity())
-            .iterator();
-
-    Iterator<LocalDateTime> timestamps =
-        Stream.iterate(
-                LocalDateTime.of(2000, 1, 1, 1, 0),
-                time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
-            .iterator();
+    TransactionSupplier transactions = new TransactionSupplier();
 
     while (isRunning) {
 
-      Long account = accounts.next();
-      LocalDateTime timestamp = timestamps.next();
-      long millis = timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+      Transaction transaction = transactions.get();
 
-      String transaction =
-          String.format(
-              "%s, %s, %s",
-              account.toString(), generator.nextInt(1000), timestamp.format(formatter));
+      long millis = transaction.timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
 
-      ProducerRecord<Long, String> record =
-          new ProducerRecord<>(topic, null, millis, account, transaction);
+      ProducerRecord<Long, Transaction> record =
+          new ProducerRecord<>(topic, null, millis, transaction.accountId, transaction);
       producer.send(record);
 
       try {
@@ -104,7 +86,7 @@ public class Producer implements Runnable, AutoCloseable {
     props.put(ProducerConfig.ACKS_CONFIG, "all");
     props.put(ProducerConfig.RETRIES_CONFIG, 0);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TransactionSerializer.class);
 
     return props;
   }
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
index f704c80..548d300 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Throttler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.playground.datagen;
 
+/** A data throttler that controls the rate at which data is written out to Kafka. */
 final class Throttler {
 
   private final long throttleBatchSize;
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java
similarity index 55%
copy from docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
copy to docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java
index e10c576..42fcb41 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/Transaction.java
@@ -16,30 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.playground.datagen;
+package org.apache.flink.playground.datagen.model;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.time.LocalDateTime;
 
-public class DataGenerator {
+/** A simple financial transaction. */
+public class Transaction {
+  public long accountId;
 
-  private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
+  public int amount;
 
-  private static final String KAFKA = "kafka:9092";
-
-  private static final String TOPIC = "transactions";
-
-  public static void main(String[] args) {
-    Producer producer = new Producer(KAFKA, TOPIC);
-
-    Runtime.getRuntime()
-        .addShutdownHook(
-            new Thread(
-                () -> {
-                  LOG.info("Shutting down");
-                  producer.close();
-                }));
-
-    producer.run();
-  }
+  public LocalDateTime timestamp;
 }
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java
similarity index 51%
copy from docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
copy to docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java
index e10c576..b57b5f0 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSerializer.java
@@ -16,30 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.playground.datagen;
+package org.apache.flink.playground.datagen.model;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
 
-public class DataGenerator {
+/** Serializes a {@link Transaction} into a CSV record. */
+public class TransactionSerializer implements Serializer<Transaction> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
+  private static final DateTimeFormatter formatter =
+      DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
 
-  private static final String KAFKA = "kafka:9092";
+  @Override
+  public void configure(Map<String, ?> map, boolean b) {}
 
-  private static final String TOPIC = "transactions";
+  @Override
+  public byte[] serialize(String s, Transaction transaction) {
+    String csv =
+        String.format(
+            "%s, %s, %s",
+            transaction.accountId, transaction.amount, transaction.timestamp.format(formatter));
 
-  public static void main(String[] args) {
-    Producer producer = new Producer(KAFKA, TOPIC);
-
-    Runtime.getRuntime()
-        .addShutdownHook(
-            new Thread(
-                () -> {
-                  LOG.info("Shutting down");
-                  producer.close();
-                }));
-
-    producer.run();
+    return csv.getBytes();
   }
+
+  @Override
+  public void close() {}
 }
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java
new file mode 100644
index 0000000..6ddf9a6
--- /dev/null
+++ b/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/model/TransactionSupplier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playground.datagen.model;
+
+import java.time.LocalDateTime;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+/** A supplier that generates an arbitrary transaction. */
+public class TransactionSupplier implements Supplier<Transaction> {
+
+  private final Random generator = new Random();
+
+  private final Iterator<Long> accounts =
+      Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
+          .flatMap(UnaryOperator.identity())
+          .iterator();
+
+  private final Iterator<LocalDateTime> timestamps =
+      Stream.iterate(
+              LocalDateTime.of(2000, 1, 1, 1, 0),
+              time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
+          .iterator();
+
+  @Override
+  public Transaction get() {
+    Transaction transaction = new Transaction();
+    transaction.accountId = accounts.next();
+    transaction.amount = generator.nextInt(1000);
+    transaction.timestamp = timestamps.next();
+
+    return transaction;
+  }
+}
diff --git a/table-walkthrough/Dockerfile b/table-walkthrough/Dockerfile
new file mode 100644
index 0000000..86cf2df
--- /dev/null
+++ b/table-walkthrough/Dockerfile
@@ -0,0 +1,45 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM maven:3.6-jdk-8-slim AS builder
+
+COPY ./pom.xml /opt/pom.xml
+COPY ./src /opt/src
+RUN cd /opt; mvn clean install -Dmaven.test.skip
+
+FROM flink:1.11-SNAPSHOT-scala_2.11
+
+# Download connector libraries for snapshot version
+RUN wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/1.11-SNAPSHOT/flink-sql-connector-kafka_2.11-1.11-20200610.034108-152.jar; \
+    wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-jdbc_2.11/1.11-SNAPSHOT/flink-connector-jdbc_2.11-1.11-20200610.033814-8.jar; \
+    wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-csv/1.11-SNAPSHOT/flink-csv-1.11-20200610.033438-153.jar; \
+    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;
+
+
+# Download connector libraries
+#RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/${FLINK_VERSION}/flink-sql-connector-kafka_2.11-${FLINK_VERSION}.jar; \
+#    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-jdbc_2.11/${FLINK_VERSION}/flink-jdbc_2.11-${FLINK_VERSION}.jar; \
+#    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar; \
+#    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;
+
+COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar
+
+RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
+    echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
+    echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
+    echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;
diff --git a/table-walkthrough/docker-compose.yml b/table-walkthrough/docker-compose.yml
new file mode 100644
index 0000000..2388af4
--- /dev/null
+++ b/table-walkthrough/docker-compose.yml
@@ -0,0 +1,91 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: '2.1'
+services:
+  jobmanager:
+    image: apache/flink-table-walkthrough:1-FLINK-1.11-scala_2.11
+    build: .
+    hostname: "jobmanager"
+    expose:
+      - "6123"
+    ports:
+      - "8082:8081"
+    command: standalone-job
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS=jobmanager
+    depends_on:
+      - kafka
+      - mysql
+  taskmanager:
+    image: apache/flink-playground-walkthrough:1-FLINK-1.11-scala_2.11
+    build: .
+    expose:
+      - "6121"
+      - "6122"
+    depends_on:
+      - jobmanager
+    command: taskmanager
+    links:
+      - jobmanager:jobmanager
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS=jobmanager
+  zookeeper:
+    image: wurstmeister/zookeeper:3.4.6
+    ports:
+      - "2181:2181"
+  kafka:
+    image: wurstmeister/kafka:2.12-2.2.1
+    ports:
+      - "9092:9092"
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: "kafka"
+      KAFKA_ADVERTISED_PORT: "9092"
+      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_CREATE_TOPICS: "kafka:1:1"
+    volumes:
+      - /var/run/docker.sock:/var/run/docker.sock
+  data-generator:
+      image: apache/data-generator:1
+      build: ../docker/data-generator
+      depends_on:
+        - kafka
+  mysql:
+    image: mysql:8.0.19
+    command: --default-authentication-plugin=mysql_native_password --secure_file_priv=/data
+    environment:
+      MYSQL_USER: "sql-demo"
+      MYSQL_PASSWORD: "demo-sql"
+      MYSQL_DATABASE: "sql-demo"
+      MYSQL_RANDOM_ROOT_PASSWORD: "yes"
+    volumes:
+      - ../docker/mysql-spend-report-init:/docker-entrypoint-initdb.d
+      - ./data:/data
+  grafana:
+    image: grafana/grafana
+    ports:
+      - "3000:3000"
+    depends_on:
+      - mysql
+    volumes:
+      - ../docker/grafana-spend-report-init/provisioning/:/etc/grafana/provisioning/
+      - ../docker/grafana-spend-report-init/dashboard.json:/etc/grafana/dashboard.json
+      - ../docker/grafana-spend-report-init/grafana.ini:/etc/grafana/grafana.ini
diff --git a/table-walkthrough/pom.xml b/table-walkthrough/pom.xml
new file mode 100644
index 0000000..dda85c8
--- /dev/null
+++ b/table-walkthrough/pom.xml
@@ -0,0 +1,251 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>org.apache.flink</groupId>
+	<artifactId>spend-report</artifactId>
+	<version>1.0.0</version>
+	<packaging>jar</packaging>
+
+	<name>Flink Walkthrough Table Java</name>
+	<url>https://flink.apache.org</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>1.11-SNAPSHOT</flink.version>
+		<java.version>1.8</java.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<maven.compiler.source>${java.version}</maven.compiler.source>
+		<maven.compiler.target>${java.version}</maven.compiler.target>
+    </properties>
+
+	<repositories>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Development Snapshot Repository</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+			<releases>
+				<enabled>false</enabled>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+		</dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Add logging framework, to produce console output when running in the IDE. -->
+        <!-- These dependencies are excluded from the application JAR by default. -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.7</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>runtime</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <!-- Java Compiler -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+
+            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.apache.flink:force-shading</exclude>
+                                    <exclude>com.google.code.findbugs:jsr305</exclude>
+                                    <exclude>org.slf4j:*</exclude>
+                                    <exclude>log4j:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.flink.playgrounds.spendreport.SpendReport</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+
+        <pluginManagement>
+            <plugins>
+
+                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-shade-plugin</artifactId>
+                                        <versionRange>[3.0.0,)</versionRange>
+                                        <goals>
+                                            <goal>shade</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-compiler-plugin</artifactId>
+                                        <versionRange>[3.1,)</versionRange>
+                                        <goals>
+                                            <goal>testCompile</goal>
+                                            <goal>compile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.rat</groupId>
+                    <artifactId>apache-rat-plugin</artifactId>
+                    <version>0.13</version>
+                    <inherited>false</inherited>
+                    <executions>
+                        <execution>
+                            <phase>verify</phase>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                    <configuration>
+                        <excludes>
+                            <!-- Additional files like .gitignore etc.-->
+                            <exclude>**/.*/**</exclude>
+                            <exclude>**/*.prefs</exclude>
+                            <exclude>**/*.log</exclude>
+                            <!-- Administrative files in the main trunk. -->
+                            <exclude>**/README.md</exclude>
+                            <exclude>**/CODE_OF_CONDUCT.md</exclude>
+                            <exclude>.github/**</exclude>
+                            <!-- IDE files. -->
+                            <exclude>**/*.iml</exclude>
+                            <!-- Generated content -->
+                            <exclude>**/target/**</exclude>
+                            <exclude>**/dependency-reduced-pom.xml</exclude>
+                        </excludes>
+                    </configuration>
+                </plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+</project>
diff --git a/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java
new file mode 100644
index 0000000..1a8cb83
--- /dev/null
+++ b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/SpendReport.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.spendreport;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+public class SpendReport {
+
+    public static Table report(Table transactions) {
+        throw new UnimplementedException();
+    }
+
+    public static void main(String[] args) throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+        TableEnvironment tEnv = TableEnvironment.create(settings);
+
+        tEnv.executeSql("CREATE TABLE transactions (\n" +
+                "    account_id  BIGINT,\n" +
+                "    amount      BIGINT,\n" +
+                "    transaction_time TIMESTAMP(3),\n" +
+                "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
+                ") WITH (\n" +
+                "    'connector' = 'kafka',\n" +
+                "    'topic'     = 'transactions',\n" +
+                "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+                "    'format'    = 'csv'\n" +
+                ")");
+
+        tEnv.executeSql("CREATE TABLE spend_report (\n" +
+                "    account_id BIGINT,\n" +
+                "    log_ts     TIMESTAMP(3),\n" +
+                "    amount     BIGINT\n," +
+                "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+                ") WITH (\n" +
+                "  'connector'  = 'jdbc',\n" +
+                "  'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+                "  'table-name' = 'spend_report',\n" +
+                "  'driver'     = 'com.mysql.jdbc.Driver',\n" +
+                "  'username'   = 'sql-demo',\n" +
+                "  'password'   = 'demo-sql'\n" +
+                ")");
+
+        Table transactions = tEnv.from("transactions");
+        report(transactions).executeInsert("spend_report");
+    }
+}
diff --git a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java
similarity index 55%
copy from docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
copy to table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java
index e10c576..e1c986f 100644
--- a/docker/data-generator/src/main/java/org/apache/flink/playground/datagen/DataGenerator.java
+++ b/table-walkthrough/src/main/java/org/apache/flink/playgrounds/spendreport/UnimplementedException.java
@@ -16,30 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.playground.datagen;
+package org.apache.flink.playgrounds.spendreport;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public class UnimplementedException extends RuntimeException {
 
-public class DataGenerator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
-
-  private static final String KAFKA = "kafka:9092";
-
-  private static final String TOPIC = "transactions";
-
-  public static void main(String[] args) {
-    Producer producer = new Producer(KAFKA, TOPIC);
-
-    Runtime.getRuntime()
-        .addShutdownHook(
-            new Thread(
-                () -> {
-                  LOG.info("Shutting down");
-                  producer.close();
-                }));
-
-    producer.run();
-  }
+    public UnimplementedException() {
+        super("This method has not yet been implemented");
+    }
 }
diff --git a/table-walkthrough/src/main/resources/log4j.properties b/table-walkthrough/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e26ea17
--- /dev/null
+++ b/table-walkthrough/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java b/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
new file mode 100644
index 0000000..208f6e9
--- /dev/null
+++ b/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.spendreport;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * A unit test of the spend report.
+ * If this test passes then the business
+ * logic is correct.
+ */
+public class SpendReportTest {
+
+    private static final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
+    
+    @Test
+    public void testReport() {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+        TableEnvironment tEnv = TableEnvironment.create(settings);
+
+        Table transactions =
+                tEnv.fromValues(
+                        DataTypes.ROW(
+                                DataTypes.FIELD("account_id", DataTypes.BIGINT()),
+                                DataTypes.FIELD("amount", DataTypes.BIGINT()),
+                                DataTypes.FIELD("transaction_time", DataTypes.TIMESTAMP(3))),
+                        Row.of(1, 188, DATE_TIME.plusMinutes(12)),
+                        Row.of(2, 374, DATE_TIME.plusMinutes(47)),
+                        Row.of(3, 112, DATE_TIME.plusMinutes(36)),
+                        Row.of(4, 478, DATE_TIME.plusMinutes(3)),
+                        Row.of(5, 208, DATE_TIME.plusMinutes(8)),
+                        Row.of(1, 379, DATE_TIME.plusMinutes(53)),
+                        Row.of(2, 351, DATE_TIME.plusMinutes(32)),
+                        Row.of(3, 320, DATE_TIME.plusMinutes(31)),
+                        Row.of(4, 259, DATE_TIME.plusMinutes(19)),
+                        Row.of(5, 273, DATE_TIME.plusMinutes(42)));
+
+        try {
+            TableResult results = SpendReport.report(transactions).execute();
+
+            MatcherAssert.assertThat(
+                    materialize(results),
+                    Matchers.containsInAnyOrder(
+                            Row.of(1L, DATE_TIME, 567L),
+                            Row.of(2L, DATE_TIME, 725L),
+                            Row.of(3L, DATE_TIME, 432L),
+                            Row.of(4L, DATE_TIME, 737L),
+                            Row.of(5L, DATE_TIME, 481L)));
+        } catch (UnimplementedException e) {
+            Assume.assumeNoException("The walkthrough has not been implemented", e);
+        }
+    }
+    
+    private static List<Row> materialize(TableResult results) {
+        try (CloseableIterator<Row> resultIterator = results.collect()) {
+            return StreamSupport
+                    .stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false)
+                    .collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to materialize results", e);
+        }
+    }
+}
diff --git a/table-walkthrough/src/test/resources/log4j.properties b/table-walkthrough/src/test/resources/log4j.properties
new file mode 100644
index 0000000..5cae07b
--- /dev/null
+++ b/table-walkthrough/src/test/resources/log4j.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.streaming.api.operators.collect.CollectResultFetcher=OFF
+log4j.logger.org.apache.flink.streaming.api.operators.collect.CollectSinkFunction=OFF
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n