You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/06/23 19:29:24 UTC

[flink] branch release-1.11 updated: [FLINK-18194][walkthroughs] Document new table walkthrough

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 1ebeb7e  [FLINK-18194][walkthroughs] Document new table walkthrough
1ebeb7e is described below

commit 1ebeb7e170e37d86acfed46744853d35a694e206
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Wed Jun 10 16:08:42 2020 -0500

    [FLINK-18194][walkthroughs] Document new table walkthrough
    
    This closes #12592
---
 docs/fig/spend-report-console.png | Bin 0 -> 271422 bytes
 docs/fig/spend-report-grafana.png | Bin 0 -> 208020 bytes
 docs/try-flink/table_api.md       | 561 ++++++++++++----------------------
 docs/try-flink/table_api.zh.md    | 614 +++++++++++++-------------------------
 4 files changed, 395 insertions(+), 780 deletions(-)

diff --git a/docs/fig/spend-report-console.png b/docs/fig/spend-report-console.png
new file mode 100644
index 0000000..ade38f3
Binary files /dev/null and b/docs/fig/spend-report-console.png differ
diff --git a/docs/fig/spend-report-grafana.png b/docs/fig/spend-report-grafana.png
new file mode 100644
index 0000000..8cca47f
Binary files /dev/null and b/docs/fig/spend-report-grafana.png differ
diff --git a/docs/try-flink/table_api.md b/docs/try-flink/table_api.md
index aa78f67..89a5410 100644
--- a/docs/try-flink/table_api.md
+++ b/docs/try-flink/table_api.md
@@ -31,18 +31,18 @@ The Table API in Flink is commonly used to ease the definition of data analytics
 
 ## What Will You Be Building? 
 
-In this tutorial, you will learn how to build a continuous ETL pipeline for tracking financial transactions by account over time.
-You will start by building your report as a nightly batch job, and then migrate to a streaming pipeline.
+In this tutorial, you will learn how to build a real-time dashboard to track financial transactions by account.
+The pipeline will read data from Kafka and write the results to MySQL visualized via Grafana.
 
 ## Prerequisites
 
-This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
-It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. 
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you come from a different programming language.
+It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses.
 
 ## Help, I’m Stuck! 
 
 If you get stuck, check out the [community support resources](https://flink.apache.org/community.html).
-In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. 
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) consistently ranks as one of the most active of any Apache project and a great way to get help quickly. 
 
 ## How To Follow Along
 
@@ -50,463 +50,272 @@ If you want to follow along, you will require a computer with:
 
 * Java 8 or 11
 * Maven 
+* Docker
 
-A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %}
-    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=spend-report \
-    -DartifactId=spend-report \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %}
-    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=spend-report \
-    -DartifactId=spend-report \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-{% endhighlight %}
-</div>
-</div>
-
-{% unless site.is_stable %}
+{% if site.version contains "SNAPSHOT" %}
 <p style="border-radius: 5px; padding: 5px" class="bg-danger">
-    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the command line. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
-    If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For example:
-{% highlight bash %}
-<settings>
-  <activeProfiles>
-    <activeProfile>apache</activeProfile>
-  </activeProfiles>
-  <profiles>
-    <profile>
-      <id>apache</id>
-      <repositories>
-        <repository>
-          <id>apache-snapshots</id>
-          <url>https://repository.apache.org/content/repositories/snapshots/</url>
-        </repository>
-      </repositories>
-    </profile>
-  </profiles>
-</settings>
-{% endhighlight %}
+  <b>
+  NOTE: The Apache Flink Docker images used for this playground are only available for
+  released versions of Apache Flink.
+  </b><br>
+  Since you are currently looking at the latest SNAPSHOT
+  version of the documentation, all version references below will not work.
+  Please switch the documentation to the latest released version via the release picker which you
+  find on the left side below the menu.
 </p>
-{% endunless %}
+{% endif %}
 
-You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
-Maven will create a project with all the dependencies to complete this tutorial.
-After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE.
+The required configuration files are available in the [flink-playgrounds](https://github.com/apache/flink-playgrounds) repository.
+Once downloaded, open the project `flink-playground/table-walkthrough` in your IDE and navigate to the file `SpendReport`. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
-
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
-tEnv.registerTableSink("spend_report", new SpendReportTableSink());
-tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+tEnv.executeSql("CREATE TABLE transactions (\n" +
+    "    account_id  BIGINT,\n" +
+    "    amount      BIGINT,\n" +
+    "    transaction_time TIMESTAMP(3),\n" +
+    "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
+    ") WITH (\n" +
+    "    'connector' = 'kafka',\n" +
+    "    'topic'     = 'transactions',\n" +
+    "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+    "    'format'    = 'csv'\n" +
+    ")");
+
+tEnv.executeSql("CREATE TABLE spend_report (\n" +
+    "    account_id BIGINT,\n" +
+    "    log_ts     TIMESTAMP(3),\n" +
+    "    amount     BIGINT\n," +
+    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+    ") WITH (\n" +
+    "   'connector'  = 'jdbc',\n" +
+    "   'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+    "   'table-name' = 'spend_report',\n" +
+    "   'driver'     = 'com.mysql.jdbc.Driver',\n" +
+    "   'username'   = 'sql-demo',\n" +
+    "   'password'   = 'demo-sql'\n" +
+    ")");
+
+Table transactions = tEnv.from("transactions");
+report(transactions).executeInsert("spend_report");
 
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report");
-
-env.execute("Spend Report");
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env  = ExecutionEnvironment.getExecutionEnvironment
-val tEnv = BatchTableEnvironment.create(env)
-
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
-tEnv.registerTableSink("spend_report", new SpendReportTableSink)
-
-val truncateDateToHour = new TruncateDateToHour
-
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report")
-
-env.execute("Spend Report")
-{% endhighlight %}
-</div>
-</div>
 
 ## Breaking Down The Code
 
 #### The Execution Environment
 
-The first two lines set up your `ExecutionEnvironment`.
-The execution environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources.
-This walkthrough begins with the batch environment since you are building a periodic batch report.
-It is then wrapped in a `BatchTableEnvironment` to have full access to the Table API.
+The first two lines set up your `TableEnvironment`.
+The table environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources.
+This walkthrough creates a standard table environment that uses the streaming execution.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env  = ExecutionEnvironment.getExecutionEnvironment
-val tEnv = BatchTableEnvironment.create(env)
-{% endhighlight %}
-</div>
-</div>
-
 
 #### Registering Tables
 
-Next, tables are registered in the execution environment that you can use to connect to external systems for reading and writing both batch and streaming data.
-A table source provides access to data stored in external systems; such as a database, a key-value store, a message queue, or a file system.
+Next, tables are registered in the current [catalog]({% link dev/table/catalogs.md %}) that you can use to connect to external systems for reading and writing both batch and streaming data.
+A table source provides access to data stored in external systems, such as a database, a key-value store, a message queue, or a file system.
 A table sink emits a table to an external storage system.
 Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
-tEnv.registerTableSink("spend_report", new SpendReportTableSink());
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
-tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+tEnv.executeSql("CREATE TABLE transactions (\n" +
+     "    account_id  BIGINT,\n" +
+     "    amount      BIGINT,\n" +
+     "    transaction_time TIMESTAMP(3),\n" +
+     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
+     ") WITH (\n" +
+     "    'connector' = 'kafka',\n" +
+     "    'topic'     = 'transactions',\n" +
+     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+     "    'format'    = 'csv'\n" +
+     ")");
 {% endhighlight %}
-</div>
-</div>
 
 Two tables are registered; a transaction input table, and a spend report output table.
-The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ amounts (`amount`).
-In this tutorial, the table is backed by data generated in memory to avoid any dependencies on external systems.
-In practice, the `BoundedTransactionTableSource` may be backed by a filesystem, a database, or any other static source.
-The spend report (`spend_report`) table logs each row with log level **INFO**, instead of writing to persistent storage, so you can easily see your results.
+The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`account_id`), timestamps (`transaction_time`), and US$ amounts (`amount`).
+The table is a logical view over a Kafka topic called `transactions` containing CSV data.
 
-#### Registering A UDF
-
-Along with the tables, a [user-defined function]({{ site.baseurl }}/dev/table/functions/udfs.html) is registered for working with timestamps.
-This function takes a timestamp and rounds it down to the nearest hour.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+tEnv.executeSql("CREATE TABLE spend_report (\n" +
+    "    account_id BIGINT,\n" +
+    "    log_ts     TIMESTAMP(3),\n" +
+    "    amount     BIGINT\n," +
+    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+    ") WITH (\n" +
+    "    'connector'  = 'jdbc',\n" +
+    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+    "    'table-name' = 'spend_report',\n" +
+    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
+    "    'username'   = 'sql-demo',\n" +
+    "    'password'   = 'demo-sql'\n" +
+    ")");
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val truncateDateToHour = new TruncateDateToHour
-{% endhighlight %}
-</div>
-</div>
+The second table, `spend_report`, stores the final results of the aggregation.
+Its underlying storage is a table in a MySql database.
 
 #### The Query
 
 With the environment configured and tables registered, you are ready to build your first application.
-From the `TableEnvironment` you can `scan` an input table to read its rows and then write those results into an output table using `insertInto`.
+From the `TableEnvironment` you can read `from` an input table to read its rows and then write those results into an output table using `executeInsert`.
+The `report` function is where you will implement your business logic.
+It is currently unimplemented.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report");
+Table transactions = tEnv.from("transactions");
+report(transactions).executeInsert("spend_report");
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report")
-{% endhighlight %}
-</div>
-</div>
-
-Initially, the Job reads all transactions and logs them out with log level **INFO**.
 
-#### Execute
+## Testing 
 
-Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
-You call `ExecutionEnvironment#execute` to begin the execution of your Job by giving it a name.
+The project contains a secondary testing class `SpendReportTest` that validates the logic of the report.
+It creates a table environment in batch mode. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-env.execute("Spend Report");
+EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+TableEnvironment tEnv = TableEnvironment.create(settings); 
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.execute("Spend Report")
-{% endhighlight %}
-</div>
-</div>
+One of Flink's unique properties is that it provides consistent semantics across batch and streaming.
+This means you can develop and test applications in batch mode on static datasets, and deploy to production as streaming applications.
 
 ## Attempt One
 
 Now with the skeleton of a Job set-up, you are ready to add some business logic.
 The goal is to build a report that shows the total spend for each account across each hour of the day.
-Just like a SQL query, Flink can select the required fields and group by your keys.
-Because the timestamp field has millisecond granularity, you can use the UDF to round it down to the nearest hour.
-Finally, select all the fields, summing the total spend per account-hour pair with the built-in `sum` [aggregate function]({{ site.baseurl }}/dev/table/functions/systemFunctions.html#aggregate-functions).
+This means the timestamp column needs be be rounded down from millisecond to hour granularity. 
+
+Flink supports developing relational applications in pure [SQL]({% link dev/table/sql/index.md %}) or using the [Table API]({% link dev/table/tableApi.md %}).
+The Table API is a fluent DSL inspired by SQL, that can be written in Python, Java, or Scala and supports strong IDE integration.
+Just like a SQL query, Table programs can select the required fields and group by your keys.
+These features, allong with [built-in functions]({% link dev/table/functions/systemFunctions.md %}) like `floor` and `sum`, you can write this report.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .select("accountId, timestamp.truncateDateToHour as timestamp, amount")
-    .groupBy("accountId, timestamp")
-    .select("accountId, timestamp, amount.sum as total")
-    .insertInto("spend_report");
+public static Table report(Table transactions) {
+    return transactions.select(
+            $("account_id"),
+            $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
+            $("amount"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount)
-    .groupBy('accountId, 'timestamp)
-    .select('accountId, 'timestamp, 'amount.sum as 'total)
-    .insertInto("spend_report")
+
+## User Defined Functions
+
+Flink contains a limited number of built-in functions, and sometimes you need to extend it with a [user-defined function]({% link dev/table/functions/udfs.md %}).
+If `floor` wasn't predefined, you could implement it yourself. 
+
+{% highlight java %}
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class MyFloor extends ScalarFunction {
+
+    public @DataTypeHint("TIMESTAMP(3)") LocalDateTime eval(
+        @DataTypeHint("TIMESTAMP(3)") LocalDateTime timestamp) {
+
+        return timestamp.truncatedTo(ChronoUnit.HOURS);
+    }
+}
 {% endhighlight %}
-</div>
-</div>
 
-This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner.
+And then quickly integrate it in your application.
 
-{% highlight raw %}
-# Query 1 output showing account id, timestamp, and amount
-
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
-> 1, 2019-01-01 02:00:00.0, $859.35
-> 2, 2019-01-01 02:00:00.0, $458.40
-> 1, 2019-01-01 03:00:00.0, $330.85
-> 2, 2019-01-01 03:00:00.0, $730.02
-> 1, 2019-01-01 04:00:00.0, $585.16
-> 2, 2019-01-01 04:00:00.0, $760.76
+{% highlight java %}
+public static Table report(Table transactions) {
+    return transactions.select(
+            $("account_id"),
+            call(MyFloor.class, $("transaction_time")).as("log_ts"),
+            $("amount"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
 
+This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner.
+Running the test with this implementation will pass. 
+
 ## Adding Windows
 
 Grouping data based on time is a typical operation in data processing, especially when working with infinite streams.
-A grouping based on time is called a [window]({{ site.baseurl }}/dev/stream/operators/windows.html) and Flink offers flexible windowing semantics.
+A grouping based on time is called a [window]({% link dev/stream/operators/windows.md %}) and Flink offers flexible windowing semantics.
 The most basic type of window is called a `Tumble` window, which has a fixed size and whose buckets do not overlap.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .window(Tumble.over("1.hour").on("timestamp").as("w"))
-    .groupBy("accountId, w")
-    .select("accountId, w.start as timestamp, amount.sum")
-    .insertInto("spend_report");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .window(Tumble over 1.hour on 'timestamp as 'w)
-    .groupBy('accountId, 'w)
-    .select('accountId, 'w.start as 'timestamp, 'amount.sum)
-    .insertInto("spend_report")
+public static Table report(Table transactions) {
+    return transactions
+        .window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts").start().as("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
-</div>
-</div>
 
 This defines your application as using one hour tumbling windows based on the timestamp column.
 So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 01:00:00` window.
 
+
 Aggregations based on time are unique because time, as opposed to other attributes, generally moves forward in a continuous streaming application.
+Unlike `floor` and your UDF, window functions are [intrinsics](https://en.wikipedia.org/wiki/Intrinsic_function), which allows the runtime to apply additional optimizations.
 In a batch context, windows offer a convenient API for grouping records by a timestamp attribute.
 
-Running the updated query will produce identical results as before.
-
-{% highlight raw %}
-# Query 2 output showing account id, timestamp, and amount
-
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
-> 1, 2019-01-01 02:00:00.0, $859.35
-> 2, 2019-01-01 02:00:00.0, $458.40
-> 1, 2019-01-01 03:00:00.0, $330.85
-> 2, 2019-01-01 03:00:00.0, $730.02
-> 1, 2019-01-01 04:00:00.0, $585.16
-> 2, 2019-01-01 04:00:00.0, $760.76
-{% endhighlight %}
+Running the test with this implementation will also pass. 
 
 ## Once More, With Streaming!
 
-Because Flink's Table API offers consistent syntax and semantics for both batch and streaming, migrating from one to the other requires just two steps.
-
-The first step is to replace the batch `ExecutionEnvironment` with its streaming counterpart, `StreamExecutionEnvironment`, which creates a continuous streaming Job.
-It includes stream-specific configurations, such as the time characteristic, which when set to [event time]({{ site.baseurl }}/dev/event_time.html) guarantees consistent results even when faced with out-of-order events or a Job failure.
-This is what will be used by your `Tumble` window when grouping records.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-val tEnv = StreamTableEnvironment.create(env)
-{% endhighlight %}
-</div>
-</div>
-
-The second step is to migrate from a bounded data source to an infinite data source.
-The project comes with an `UnboundedTransactionTableSource` that continuously creates transaction events in real-time.
-Similar to the `BoundedTransactionTableSource` this table is backed by data generated in memory to avoid any dependencies on external systems.
-In practice, this table might read from a streaming source such as Apache Kafka, AWS Kinesis, or Pravega.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
-{% endhighlight %}
-</div>
-</div>
-
 And that's it, a fully functional, stateful, distributed streaming application!
-The query continuously consumes the stream of transactions, computes the hourly spendings, and emits results as soon as they are ready.
+The query continuously consumes the stream of transactions from Kafka, computes the hourly spendings, and emits results as soon as they are ready.
 Since the input is unbounded, the query keeps running until it is manually stopped.
 And because the Job uses time window-based aggregations, Flink can perform specific optimizations such as state clean up when the framework knows that no more records will arrive for a particular window.
 
-{% highlight raw %}
-# Query 3 output showing account id, timestamp, and amount
-
-# These rows are calculated continuously over the hour 
-# and output immediately at the end of the hour
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
+The table playground is fully dockerized and runnable locally as streaming application.
+The environment contains a Kafka topic, a continuous data generator, MySql, and Grafana. 
 
-# Flink begins computing these rows as soon as 
-# as the first record for the window arrives
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
+From within the `table-walkthrough` folder start the docker-compose script.
 
+{% highlight bash %}
+$ docker-compose build
+$ docker-compose up -d
 {% endhighlight %}
 
-## Final Application
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-package spendreport;
-
-import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
-import org.apache.flink.walkthrough.common.table.TransactionTableSource;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Tumble;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-public class SpendReport {
+You can see information on the running job via the [Flink console](http://localhost:8082/).
 
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+![Flink Console]({% link /fig/spend-report-console.png %}){:height="400px" width="800px"}
 
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+Explore the results from inside MySQL.
 
-        tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
-        tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+{% highlight bash %}
+$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
 
-        tEnv
-            .scan("transactions")
-            .window(Tumble.over("1.hour").on("timestamp").as("w"))
-            .groupBy("accountId, w")
-            .select("accountId, w.start as timestamp, amount.sum")
-            .insertInto("spend_report");
+mysql> use sql-demo;
+Database changed
 
-        env.execute("Spend Report");
-    }
-}
+mysql> select count(*) from spend_report;
++----------+
+| count(*) |
++----------+
+|      110 |
++----------+
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-package spendreport
-
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;
-import org.apache.flink.walkthrough.common.table._
-
-object SpendReport {
-
-    def main(args: Array[String]): Unit = {
-        val env = StreamExecutionEnvironment.getExecutionEnvironment
-        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-        val tEnv = StreamTableEnvironment.create(env)
-
-        tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
-        tEnv.registerTableSink("spend_report", new SpendReportTableSink)
-
-        tEnv
-            .scan("transactions")
-            .window(Tumble over 1.hour on 'timestamp as 'w)
-            .groupBy('accountId, 'w)
-            .select('accountId, 'w.start as 'timestamp, 'amount.sum)
-            .insertInto("spend_report")
-
-        env.execute("Spend Report")
-    }
-}
-{% endhighlight %}
-</div>
-</div>
+Finally, go to [Grafana](http://localhost:3000/d/FOe0PbmGk/walkthrough?viewPanel=2&orgId=1&refresh=5s) to see the fully visualized result!
 
+![Grafana]({% link /fig/spend-report-grafana.png %}){:height="400px" width="800px"}
diff --git a/docs/try-flink/table_api.zh.md b/docs/try-flink/table_api.zh.md
index 3fc3e13..8d8bc05 100644
--- a/docs/try-flink/table_api.zh.md
+++ b/docs/try-flink/table_api.zh.md
@@ -23,492 +23,298 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Filnk 提供 Table API 作为批处理和流处理统一的关系型API,
-即查询在无界实时流或有界批数据集上以相同的语义执行,并产生相同的结果。
-Flink 中的 Table API 通常用于简化数据分析,数据流水线和 ETL 应用程序的定义。
+Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, batch data sets and produce the same results.
+The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications.
+
 * This will be replaced by the TOC
 {:toc}
 
-## 接下来你会构建什么? 
+## What Will You Be Building? 
+
+In this tutorial, you will learn how to build a real-time dashboard to track financial transactions by account.
+The pipeline will read data from Kafka and write the results to MySQL visualized via Grafana.
 
-在本教程中,你将学习如何构建连续的 ETL 流水线,以便按账户随时跟踪金融交易。
-首先你将报表构建为每晚执行的批处理作业,然后迁移到流式管道。
+## Prerequisites
 
-## 先决条件
+This walk-through assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you come from a different programming language.
+It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses.
 
-本演练假设你对 Java 和 Scala 有一定的了解,但即便你使用其他编程语言,相信也可以学会。
-它还假定你熟悉基本的关系概念比如 `SELECT` 和 `GROUP BY` 子句。
+## Help, I’m Stuck! 
 
-## 救命,我被困住了!
+If you get stuck, check out the [community support resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) consistently ranks as one of the most active of any Apache project and a great way to get help quickly. 
 
-如果你被难题困住了,可以在[社区](https://flink.apache.org/community.html)寻求帮助。
-值得一提的是,Apache Flink 的[用户邮件列表](https://flink.apache.org/community.html#mailing-lists)一直是最活跃的 Apache 项目之一,也是一个快速获得帮助的好途径。
+## How To Follow Along
 
-## 如何跟进
+If you want to follow along, you will require a computer with: 
 
-如果想要继续,你的电脑需要安装:
 * Java 8 or 11
 * Maven 
+* Docker
 
-现成的 Flink Maven Archetype 可以快速创建一个具有所有必要依赖的框架项目:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %}
-    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=spend-report \
-    -DartifactId=spend-report \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %}
-    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=spend-report \
-    -DartifactId=spend-report \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-{% endhighlight %}
-</div>
-</div>
-
-{% unless site.is_stable %}
+{% if site.version contains "SNAPSHOT" %}
 <p style="border-radius: 5px; padding: 5px" class="bg-danger">
-    <b>注意</b>:Maven 3.0 及更高版本,不再支持通过命令行指定仓库(-DarchetypeCatalog)。有关这个改动的详细信息,
-    请参阅 <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven 官方文档</a>
-    如果你希望使用快照仓库,则需要在 settings.xml 文件中添加一个仓库条目。例如:
-{% highlight bash %}
-<settings>
-  <activeProfiles>
-    <activeProfile>apache</activeProfile>
-  </activeProfiles>
-  <profiles>
-    <profile>
-      <id>apache</id>
-      <repositories>
-        <repository>
-          <id>apache-snapshots</id>
-          <url>https://repository.apache.org/content/repositories/snapshots/</url>
-        </repository>
-      </repositories>
-    </profile>
-  </profiles>
-</settings>
-{% endhighlight %}
+  <b>
+  NOTE: The Apache Flink Docker images used for this playground are only available for
+  released versions of Apache Flink.
+  </b><br>
+  Since you are currently looking at the latest SNAPSHOT
+  version of the documentation, all version references below will not work.
+  Please switch the documentation to the latest released version via the release picker which you
+  find on the left side below the menu.
 </p>
-{% endunless %}
+{% endif %}
 
-你可以根据自己的意愿修改 `groupId`、`artifactId` 和 `package` 参数。通过使用以上参数,
-Maven 将创建一个拥有全部所需依赖的项目来完成本教程。
-把项目导入编辑器后,你会看到一个包含以下代码的文件,你可以在 IDE 中直接运行它。
+The required configuration files are available in the [flink-playgrounds](https://github.com/apache/flink-playgrounds) repository.
+Once downloaded, open the project `flink-playground/table-walkthrough` in your IDE and navigate to the file `SpendReport`. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
-
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
-tEnv.registerTableSink("spend_report", new SpendReportTableSink());
-tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
-
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report");
-
-env.execute("Spend Report");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env  = ExecutionEnvironment.getExecutionEnvironment
-val tEnv = BatchTableEnvironment.create(env)
-
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
-tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+tEnv.executeSql("CREATE TABLE transactions (\n" +
+    "    account_id  BIGINT,\n" +
+    "    amount      BIGINT,\n" +
+    "    transaction_time TIMESTAMP(3),\n" +
+    "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
+    ") WITH (\n" +
+    "    'connector' = 'kafka',\n" +
+    "    'topic'     = 'transactions',\n" +
+    "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+    "    'format'    = 'csv'\n" +
+    ")");
+
+tEnv.executeSql("CREATE TABLE spend_report (\n" +
+    "    account_id BIGINT,\n" +
+    "    log_ts     TIMESTAMP(3),\n" +
+    "    amount     BIGINT\n," +
+    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+    ") WITH (\n" +
+    "   'connector'  = 'jdbc',\n" +
+    "   'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+    "   'table-name' = 'spend_report',\n" +
+    "   'driver'     = 'com.mysql.jdbc.Driver',\n" +
+    "   'username'   = 'sql-demo',\n" +
+    "   'password'   = 'demo-sql'\n" +
+    ")");
+
+Table transactions = tEnv.from("transactions");
+report(transactions).executeInsert("spend_report");
 
-val truncateDateToHour = new TruncateDateToHour
-
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report")
-
-env.execute("Spend Report")
 {% endhighlight %}
-</div>
-</div>
 
-## 代码详解
+## Breaking Down The Code
 
-#### 运行环境
+#### The Execution Environment
 
-前两行设置了你的 `ExecutionEnvironment`。
-运行环境用来设置作业的属性、指定应用是批处理还是流处理,以及创建数据源。
-由于你正在建立一个定时的批处理报告,本教程以批处理环境作为开始。
-然后将其包装进 `BatchTableEnvironment` 中从而能够使用所有的 Tabel API。
+The first two lines set up your `TableEnvironment`.
+The table environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources.
+This walkthrough creates a standard table environment that uses the streaming runtime.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env  = ExecutionEnvironment.getExecutionEnvironment
-val tEnv = BatchTableEnvironment.create(env)
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
 {% endhighlight %}
-</div>
-</div>
 
-
-#### 注册表
-
-接下来,表将会被注册到运行环境之中,这样你就可以用它们连接外部系统以读取或写入批数据或流数据。
-source 提供对存储在外部系统中的数据的访问;例如数据库、键-值存储、消息队列或文件系统。
-sink 则将表中的数据发送到外部存储系统。
-根据 source 或 sink 的类型,它们支持不同的格式,如 CSV、JSON、Avro 或 Parquet。
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
-tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+#### Registering Tables
+
+Next, tables are registered in the environment that you can use to connect to external systems for reading and writing both batch and streaming data.
+A table source provides access to data stored in external systems, such as a database, a key-value store, a message queue, or a file system.
+A table sink emits a table to an external storage system.
+Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet.
+
+{% highlight sql %}
+tEnv.executeSql("CREATE TABLE transactions (\n" +
+     "    account_id  BIGINT,\n" +
+     "    amount      BIGINT,\n" +
+     "    transaction_time TIMESTAMP(3),\n" +
+     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
+     ") WITH (\n" +
+     "    'connector' = 'kafka',\n" +
+     "    'topic'     = 'transactions',\n" +
+     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+     "    'format'    = 'csv'\n" +
+     ")");
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
-tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+Two tables are registered; a transaction input table, and a spend report output table.
+The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`account_id`), timestamps (`transaction_time`), and US$ amounts (`amount`).
+The table is a logical view over a Kafka topic called `transactions` containing CSV data.
+
+{% highlight sql %}
+tEnv.executeSql("CREATE TABLE spend_report (\n" +
+    "    account_id BIGINT,\n" +
+    "    log_ts     TIMESTAMP(3),\n" +
+    "    amount     BIGINT\n," +
+    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+    ") WITH (\n" +
+    "    'connector'  = 'jdbc',\n" +
+    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+    "    'table-name' = 'spend_report',\n" +
+    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
+    "    'username'   = 'sql-demo',\n" +
+    "    'password'   = 'demo-sql'\n" +
+    ")");
 {% endhighlight %}
-</div>
-</div>
 
-上例代码注册了两张表。交易表作为输入表,支出报告表作为输出表。
-我们可以从交易(`transactions`)表中读取信用卡的交易记录,其中包含了账户 ID(`accountId`)字段、时间戳(`timestamp`)字段和交易金额(`amount`)字段。
-本教程中,该表使用内存中的数据,以避免对外部系统的任何依赖。
-而在实际情况下,`BoundedTransactionTableSource` 可能来源于文件系统、数据库或任何静态数据源。
-支出报告表 `spend_report` 用 **INFO** 日志级别将表的每一行数据记录到日志,而不是写入持久化存储,所以你可以很容易地查看结果。
+The second table, `spend_report`, stores the final results of the aggregation.
+Its underlying storage is a table in a MySql database.
 
-#### 注册 UDF
+#### The Query
 
-一个用来处理时间戳的[自定义函数]({{ site.baseurl }}/zh/dev/table/functions/udfs.html)随表一起被注册到tEnv中。
-此函数将时间戳向下舍入到最接近的小时。
+With the environment configured and tables registered, you are ready to build your first application.
+From the `TableEnvironment` you can read `from` an input table to read its rows and then write those results into an output table using `executeInsert`.
+The `report` function is where you will implement your business logic.
+It is currently unimplemented.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+Table transactions = tEnv.from("transactions");
+report(transactions).executeInsert("spend_report");
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val truncateDateToHour = new TruncateDateToHour
-{% endhighlight %}
-</div>
-</div>
-
-#### 查询
+## Testing 
 
-完成配置环境和注册表后,你已准备好构建第一个应用程序。
-从 `TableEnvironment` 中,你可以 `scan` 一个输入表读取其中的行,然后用 `insertInto` 把这些数据写到输出表中。
+The project contains a secondary testing class `SpendReportTest` that validates the logic of the report.
+It creates a table environment in batch mode. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report")
+EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+TableEnvironment tEnv = TableEnvironment.create(settings); 
 {% endhighlight %}
-</div>
-</div>
 
-最初,作业读取所有的交易记录并用 **INFO** 日志级别将其记录下来。
+One of Flink's unique properties is that it provides consistent semantics across batch and streaming.
+This means you can develop and test applications in batch mode on static datasets, and deploy to production as streaming applications!
 
-#### 运行
+## Attempt One
 
-Flink 应用是延迟构建的,只有完全定义好之后才交付集群运行。
-你可以调用 `ExecutionEnvironment#execute` 来开始作业的执行并给它取一个名字。
+Now with the skeleton of a Job set-up, you are ready to add some business logic.
+The goal is to build a report that shows the total spend for each account across each hour of the day.
+This means the timestamp column needs be be rounded down from millisecond to hour granularity. 
 
+Just like a SQL query, Flink can select the required fields and group by your keys.
+These features, allong with [built-in functions]({% link dev/table/functions/systemFunctions.zh.md %}) like `floor` and `sum`, you can write this report.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-env.execute("Spend Report");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.execute("Spend Report")
-{% endhighlight %}
-</div>
-</div>
-
-## 尝试一下
-
-现在有了作业设置的框架,你就可以添加一些业务逻辑了。
-目标是建立一个报表来显示每天每小时每个账户的总支出。
-就像一个 SQL 查询一样,Flink 可以选取所需的字段并且按键分组。
-由于时间戳字段具有毫秒的粒度,你可以使用自定义函数将其舍入到最近的小时。
-最后,选取所有的字段,用内建的 `sum` [聚合函数]({{ site.baseurl }}/zh/dev/table/functions/systemFunctions.html#aggregate-functions)函数合计每一个账户每小时的支出。
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .select("accountId, timestamp.truncateDateToHour as timestamp, amount")
-    .groupBy("accountId, timestamp")
-    .select("accountId, timestamp, amount.sum as total")
-    .insertInto("spend_report");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount)
-    .groupBy('accountId, 'timestamp)
-    .select('accountId, 'timestamp, 'amount.sum as 'total)
-    .insertInto("spend_report")
-{% endhighlight %}
-</div>
-</div>
-
-这个查询处理了 `transactions` 表中的所有记录,计算报告,并以高效、可扩展的方式输出结果。
-{% highlight raw %}
-# 查询 1 的输出显示了账户 id、时间戳和消费总额。
-
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
-> 1, 2019-01-01 02:00:00.0, $859.35
-> 2, 2019-01-01 02:00:00.0, $458.40
-> 1, 2019-01-01 03:00:00.0, $330.85
-> 2, 2019-01-01 03:00:00.0, $730.02
-> 1, 2019-01-01 04:00:00.0, $585.16
-> 2, 2019-01-01 04:00:00.0, $760.76
+public static Table report(Table rows) {
+    return rows.select(
+            $("account_id"),
+            $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
+            $("amount"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
 
-## 添加窗口
+## User Defined Functions
 
-根据时间进行分组在数据处理中是一种很常见的方式,特别是在处理无限的数据流时。
-基于时间的分组称为[窗口]({{ site.baseurl }}/zh/dev/stream/operators/windows.html) ,Flink 提供了灵活的窗口语义。
-其中最基础的是 `Tumble` window (滚动窗口),它具有固定大小且窗口之间不重叠。
+Flink contains a limited number of built-in functions, and sometimes you need to extend it with a [user-defined function]({% link dev/table/functions/udfs.zh.md %}).
+If `floor` wasn't predefined, you could implement it yourself. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .window(Tumble.over("1.hour").on("timestamp").as("w"))
-    .groupBy("accountId, w")
-    .select("accountId, w.start as timestamp, amount.sum")
-    .insertInto("spend_report");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .window(Tumble over 1.hour on 'timestamp as 'w)
-    .groupBy('accountId, 'w)
-    .select('accountId, 'w.start as 'timestamp, 'amount.sum)
-    .insertInto("spend_report")
-{% endhighlight %}
-</div>
-</div>
-
-你的应用将会使用基于时间戳字段的一小时的滚动窗口。
-因此时间戳是 `2019-06-01 01:23:47` 的行被放入 `2019-06-01 01:00:00` 这个时间窗口之中。
-
-在持续的流式应用中,基于时间的聚合结果是唯一的,因为相较于其他属性,时间通常会向前移动。
-在批处理环境中,窗口提供了一个方便的 API,用于按时间戳属性对记录进行分组。
-
-运行这个更新过的查询将会得到和之前一样的结果。
-
-{% highlight raw %}
-# 查询 2 的输出显示了账户 id、时间戳和消费总额
-
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
-> 1, 2019-01-01 02:00:00.0, $859.35
-> 2, 2019-01-01 02:00:00.0, $458.40
-> 1, 2019-01-01 03:00:00.0, $330.85
-> 2, 2019-01-01 03:00:00.0, $730.02
-> 1, 2019-01-01 04:00:00.0, $585.16
-> 2, 2019-01-01 04:00:00.0, $760.76
-{% endhighlight %}
-
-## 通过流处理的方式再来一次!
-
-因为 Flink 的 Table API 为批处理和流处理提供了相同的语法和语义,从一种方式迁移到另一种方式只需要两步。
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
 
-第一步是把批处理的 `ExecutionEnvironment` 替换成流处理对应的 `StreamExecutionEnvironment`,后者创建连续的流作业。
-它包含特定于流处理的配置,比如时间特性。当这个属性被设置成 [事件时间]({{ site.baseurl }}/zh/dev/event_time.html)时,它能保证即使遭遇乱序事件或者作业失败的情况也能输出一致的结果。
-滚动窗口在对数据进行分组时就运用了这个特性。
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-{% endhighlight %}
-</div>
+public class MyFloor extends ScalarFunction {
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    public @DataTypeHint("TIMESTAMP(3)") LocalDateTime eval(
+        @DataTypeHint("TIMESTAMP(3)") LocalDateTime timestamp) {
 
-val tEnv = StreamTableEnvironment.create(env)
+        return timestamp.truncatedTo(ChronoUnit.HOURS);
+    }
+}
 {% endhighlight %}
-</div>
-</div>
 
-第二步就是把有界的数据源替换成无限的数据源。
-这个项目通过 `UnboundedTransactionTableSource` 持续不断地实时生成交易事件。
-与 `BoundedTransactionTableSource` 一样,这个表也是通过在内存中生成数据从而不依赖外部系统。
-在实践中,这个表可能从一个流式数据源中读取数据,比如 Apache Kafka、AWS Kinesis 或者 Pravega。
+And then quickly integrate it in your application.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+public static Table report(Table rows) {
+    return rows.select(
+            $("account_id"),
+            call(MyFloor.class, $("transaction_time")).as("log_ts"),
+            $("amount"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
-</div>
-</div>
 
-这就是一个功能齐全、有状态的分布式流式应用!
-这个查询会持续处理交易流,计算每小时的消费额,然后实时输出结果。
-由于输入是无界的,因此查询将一直运行,直到手动停止为止。
-因为这个作业使用了基于时间窗口的聚合,Flink 可以使用一些特定的优化,比如当系统知道一个特定的窗口不会再有新的数据到来,它就会对状态进行清理。
+This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner.
+Running the test with this implementation will pass. 
 
-{% highlight raw %}
-# 查询 3 的输出显示了账户 id、时间戳消费总额
+## Adding Windows
 
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
-
-# 这些行是在这一小时中连续计算的
-# 并在这一小时结束时立刻输出
-
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
-
-# 当接收到该窗口的第一条数据时
-# Flink 就开始计算了
+Grouping data based on time is a typical operation in data processing, especially when working with infinite streams.
+A grouping based on time is called a [window]({% link dev/stream/operators/windows.zh.md %}) and Flink offers flexible windowing semantics.
+The most basic type of window is called a `Tumble` window, which has a fixed size and whose buckets do not overlap.
 
+{% highlight java %}
+public static Table report(Table rows) {
+    return rows.window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts").start().as("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
 
-## 最终程序
+This defines your application as using one hour tumbling windows based on the timestamp column.
+So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 01:00:00` window.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-package spendreport;
 
-import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
-import org.apache.flink.walkthrough.common.table.TransactionTableSource;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Tumble;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+Aggregations based on time are unique because time, as opposed to other attributes, generally moves forward in a continuous streaming application.
+Unlike `floor` and your UDF, window functions are [intrinsics](https://en.wikipedia.org/wiki/Intrinsic_function), which allows the runtime to apply additional optimizations.
+In a batch context, windows offer a convenient API for grouping records by a timestamp attribute.
 
-public class SpendReport {
+Running the test with this implementation will also pass. 
 
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+## Once More, With Streaming!
 
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+And that's it, a fully functional, stateful, distributed streaming application!
+The query continuously consumes the stream of transactions from Kafka, computes the hourly spendings, and emits results as soon as they are ready.
+Since the input is unbounded, the query keeps running until it is manually stopped.
+And because the Job uses time window-based aggregations, Flink can perform specific optimizations such as state clean up when the framework knows that no more records will arrive for a particular window.
 
-        tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
-        tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+The table playground is fully dockerized and runnable locally as streaming application.
+The environment contains a Kafka topic, a continuous data generator, MySql, and Grafana. 
 
-        tEnv
-            .scan("transactions")
-            .window(Tumble.over("1.hour").on("timestamp").as("w"))
-            .groupBy("accountId, w")
-            .select("accountId, w.start as timestamp, amount.sum")
-            .insertInto("spend_report");
+From within the `table-walkthrough` folder start the docker-compose script.
 
-        env.execute("Spend Report");
-    }
-}
+{% highlight bash %}
+$ docker-compose build
+$ docker-compose up -d
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-package spendreport
+You can see information on the running job via the [Flink console](http://localhost:8082/).
 
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.api.Tumble
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
-import org.apache.flink.walkthrough.common.table._
+![Flink Console]({% link /fig/spend-report-console.png %}){:height="400px" width="800px"}
 
-object SpendReport {
+Explore the results from inside MySQL.
 
-    def main(args: Array[String]): Unit = {
-        val env = StreamExecutionEnvironment.getExecutionEnvironment
-        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% highlight bash %}
+$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
 
-        val tEnv = StreamTableEnvironment.create(env)
+mysql> use sql-demo;
+Database changed
 
-        tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
-        tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+mysql> select count(*) from spend_report;
++----------+
+| count(*) |
++----------+
+|      110 |
++----------+
+{% endhighlight %}
 
-        tEnv
-            .scan("transactions")
-            .window(Tumble over 1.hour on 'timestamp as 'w)
-            .groupBy('accountId, 'w)
-            .select('accountId, 'w.start as 'timestamp, 'amount.sum)
-            .insertInto("spend_report")
+Finally, go to [Grafana](http://localhost:3000/d/FOe0PbmGk/walkthrough?viewPanel=2&orgId=1&refresh=5s) to see the fully visualized result!
+
+![Grafana]({% link /fig/spend-report-grafana.png %}){:height="400px" width="800px"}
 
-        env.execute("Spend Report")
-    }
-}
-{% endhighlight %}
-</div>
-</div>