You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/07/29 20:47:11 UTC

[flink] branch master updated: [FLINK-12747][docs] Getting Started - Table Api Walkthrough

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f4943dd  [FLINK-12747][docs] Getting Started - Table Api Walkthrough
f4943dd is described below

commit f4943dd06b2ff7da899812a2aaa2e0b24c2afc01
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Wed Jun 26 14:48:38 2019 -0500

    [FLINK-12747][docs] Getting Started - Table Api Walkthrough
---
 docs/getting-started/examples/index.md             |   2 +-
 docs/getting-started/examples/index.zh.md          |   2 +-
 docs/getting-started/tutorials/index.md            |   2 +-
 docs/getting-started/tutorials/index.zh.md         |   2 +-
 .../{tutorials => walkthroughs}/index.md           |   8 +-
 .../{tutorials => walkthroughs}/index.zh.md        |   8 +-
 docs/getting-started/walkthroughs/table_api.md     | 494 +++++++++++++++++++++
 docs/getting-started/walkthroughs/table_api.zh.md  | 494 +++++++++++++++++++++
 flink-end-to-end-tests/run-nightly-tests.sh        |   3 +
 .../test-scripts/test_table_walkthroughs.sh        |  74 +++
 .../flink-walkthrough-common/pom.xml               |  59 +++
 .../walkthrough/common/entity/Transaction.java     |  93 ++++
 .../common/sink/LoggerOutputFormat.java            |  50 +++
 .../common/source/TransactionIterator.java         | 130 ++++++
 .../common/source/TransactionRowInputFormat.java   |  60 +++
 .../common/source/TransactionSource.java           |  65 +++
 .../table/BoundedTransactionTableSource.java       |  55 +++
 .../common/table/SpendReportTableSink.java         |  95 ++++
 .../common/table/TruncateDateToHour.java           |  46 ++
 .../table/UnboundedTransactionTableSource.java     |  89 ++++
 .../flink-walkthrough-table-java/pom.xml           |  26 +-
 .../META-INF/maven/archetype-metadata.xml          |  25 +-
 .../src/main/resources/archetype-resources/pom.xml | 263 +++++++++++
 .../src/main/java/SpendReport.java                 |  45 ++
 .../src/main/resources/log4j.properties            |  24 +
 .../flink-walkthrough-table-scala/pom.xml          |  26 +-
 .../META-INF/maven/archetype-metadata.xml          |  25 +-
 .../src/main/resources/archetype-resources/pom.xml | 300 +++++++++++++
 .../src/main/resources/log4j.properties            |  24 +
 .../src/main/scala/SpendReport.scala               |  41 ++
 flink-walkthroughs/pom.xml                         |  95 ++++
 pom.xml                                            |   1 +
 32 files changed, 2686 insertions(+), 40 deletions(-)

diff --git a/docs/getting-started/examples/index.md b/docs/getting-started/examples/index.md
index 6810bbd..d4d315c 100644
--- a/docs/getting-started/examples/index.md
+++ b/docs/getting-started/examples/index.md
@@ -3,7 +3,7 @@ title: Examples
 nav-id: examples
 nav-title: '<i class="fa fa-file-code-o title appetizer" aria-hidden="true"></i> Examples'
 nav-parent_id: getting-started
-nav-pos: 2
+nav-pos: 3
 nav-show_overview: true
 ---
 <!--
diff --git a/docs/getting-started/examples/index.zh.md b/docs/getting-started/examples/index.zh.md
index 3bca74a..e0925d8 100644
--- a/docs/getting-started/examples/index.zh.md
+++ b/docs/getting-started/examples/index.zh.md
@@ -3,7 +3,7 @@ title: 示例
 nav-id: examples
 nav-title: '<i class="fa fa-file-code-o title appetizer" aria-hidden="true"></i> 示例'
 nav-parent_id: getting-started
-nav-pos: 2
+nav-pos: 3
 nav-show_overview: true
 ---
 <!--
diff --git a/docs/getting-started/tutorials/index.md b/docs/getting-started/tutorials/index.md
index 9b0c8f4..f283489 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/docs/getting-started/tutorials/index.md
@@ -3,7 +3,7 @@ title: "Tutorials"
 nav-id: tutorials
 nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
 nav-parent_id: getting-started
-nav-pos: 1
+nav-pos: 2
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/tutorials/index.zh.md b/docs/getting-started/tutorials/index.zh.md
index 540a6c4..ac6715b 100644
--- a/docs/getting-started/tutorials/index.zh.md
+++ b/docs/getting-started/tutorials/index.zh.md
@@ -3,7 +3,7 @@ title: "教程"
 nav-id: tutorials
 nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> 教程'
 nav-parent_id: getting-started
-nav-pos: 1
+nav-pos: 2
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/tutorials/index.md b/docs/getting-started/walkthroughs/index.md
similarity index 84%
copy from docs/getting-started/tutorials/index.md
copy to docs/getting-started/walkthroughs/index.md
index 9b0c8f4..ae2f536 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/docs/getting-started/walkthroughs/index.md
@@ -1,7 +1,7 @@
 ---
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
+title: "Code Walkthroughs"
+nav-id: walkthroughs
+nav-title: '<i class="fa fa-bolt title appetizer" aria-hidden="true"></i> Code Walkthroughs'
 nav-parent_id: getting-started
 nav-pos: 1
 ---
@@ -22,4 +22,4 @@ software distributed under the License is distributed on an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
--->
+-->
\ No newline at end of file
diff --git a/docs/getting-started/tutorials/index.zh.md b/docs/getting-started/walkthroughs/index.zh.md
similarity index 84%
copy from docs/getting-started/tutorials/index.zh.md
copy to docs/getting-started/walkthroughs/index.zh.md
index 540a6c4..ae2f536 100644
--- a/docs/getting-started/tutorials/index.zh.md
+++ b/docs/getting-started/walkthroughs/index.zh.md
@@ -1,7 +1,7 @@
 ---
-title: "教程"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> 教程'
+title: "Code Walkthroughs"
+nav-id: walkthroughs
+nav-title: '<i class="fa fa-bolt title appetizer" aria-hidden="true"></i> Code Walkthroughs'
 nav-parent_id: getting-started
 nav-pos: 1
 ---
@@ -22,4 +22,4 @@ software distributed under the License is distributed on an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
--->
+-->
\ No newline at end of file
diff --git a/docs/getting-started/walkthroughs/table_api.md b/docs/getting-started/walkthroughs/table_api.md
new file mode 100644
index 0000000..878fb54
--- /dev/null
+++ b/docs/getting-started/walkthroughs/table_api.md
@@ -0,0 +1,494 @@
+---
+title: "Table API"
+nav-id: tableapiwalkthrough
+nav-title: 'Table API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, batch data sets and produce the same results.
+The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Will You Be Building? 
+
+In this tutorial, you will learn how to build a continuous ETL pipeline for tracking financial transactions by account over time.
+You will start by building your report as a nightly batch job, and then migrate to a streaming pipeline.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. 
+
+## How To Follow Along
+
+If you want to follow along, you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=spend-report \
+    -DartifactId=spend-report \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=spend-report \
+    -DartifactId=spend-report \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven's official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+    .scan("transactions")
+    .insertInto("spend_report");
+
+env.execute("Spend Report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env  = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = BatchTableEnvironment.create(env)
+
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+val truncateDateToHour = new TruncateDateToHour
+
+tEnv
+    .scan("transactions")
+    .insertInto("spend_report")
+
+env.execute("Spend Report")
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down The Code
+
+#### The Execution Environment
+
+The first two lines set up your `ExecutionEnvironment`.
+The execution environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources.
+This walkthrough begins with the batch environment since you are building a periodic batch report.
+It is then wrapped in a `BatchTableEnvironment` to have full access to the Table API.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env  = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = BatchTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Registering Tables
+
+Next, tables are registered in the execution environment that you can use to connect to external systems for reading and writing both batch and streaming data.
+A table source provides access to data stored in external systems; such as a database, a key-value store, a message queue, or a file system.
+A table sink emits a table to an external storage system.
+Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+{% endhighlight %}
+</div>
+</div>
+
+Two tables are registered; a transaction input table, and a spend report output table.
+The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ amounts (`amount`).
+In this tutorial, the table is backed by data generated in memory to avoid any dependencies on external systems.
+In practice, the `BoundedTransactionTableSource` may be backed by a filesystem, a database, or any other static source.
+The spend report (`spend_report`) table logs each row with log level **INFO**, instead of writing to persistent storage, so you can easily see your results.
+
+#### Registering A UDF
+
+Along with the tables, a [user-defined function]({{ site.baseurl }}/dev/table/udfs.html) is registered for working with timestamps.
+This function takes a timestamp and rounds it down to the nearest hour.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val truncateDateToHour = new TruncateDateToHour
+{% endhighlight %}
+</div>
+</div>
+
+#### The Query
+
+With the environment configured and tables registered, you are ready to build your first application.
+From the `TableEnvironment` you can `scan` an input table to read its rows and then write those results into an output table using `insertInto`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+    .scan("transactions")
+    .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+    .scan("transactions")
+    .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+Initially, the Job reads all transactions and logs them out with log level **INFO**.
+
+#### Execute
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+You call `ExecutionEnvironment#execute` to begin the execution of your Job by giving it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Spend Report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Spend Report")
+{% endhighlight %}
+</div>
+</div>
+
+## Attempt One
+
+Now with the skeleton of a Job set-up, you are ready to add some business logic.
+The goal is to build a report that shows the total spend for each account across each hour of the day.
+Just like a SQL query, Flink can select the required fields and group by your keys.
+Because the timestamp field has millisecond granularity, you can use the UDF to round it down to the nearest hour.
+Finally, select all the fields, summing the total spend per account-hour pair with the built-in `sum` [aggregate function]({{ site.baseurl }}/dev/table/functions.html#aggregate-functions).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+    .scan("transactions")
+    .select("accountId, timestamp.truncateDateToHour as timestamp, amount")
+    .groupBy("accountId, timestamp")
+    .select("accountId, timestamp, amount.sum as total")
+    .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+    .scan("transactions")
+    .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount)
+    .groupBy('accountId, 'timestamp)
+    .select('accountId, 'timestamp, 'amount.sum as 'total)
+    .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner.
+
+{% highlight raw %}
+# Query 1 output showing account id, timestamp, and amount
+
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+> 1, 2019-01-01 02:00:00.0, $859.35
+> 2, 2019-01-01 02:00:00.0, $458.40
+> 1, 2019-01-01 03:00:00.0, $330.85
+> 2, 2019-01-01 03:00:00.0, $730.02
+> 1, 2019-01-01 04:00:00.0, $585.16
+> 2, 2019-01-01 04:00:00.0, $760.76
+{% endhighlight %}
+
+## Adding Windows
+
+Grouping data based on time is a typical operation in data processing, especially when working with infinite streams.
+A grouping based on time is called a [window]({{ site.baseurl }} /dev/stream/operators/windows.html) and Flink offers flexible windowing semantics.
+The most basic type of window is called a `Tumble` window, which has a fixed size and whose buckets do not overlap.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+    .scan("transactions")
+    .window(Tumble.over("1.hour").on("timestamp").as("w"))
+    .groupBy("accountId, w")
+    .select("accountId, w.start as timestamp, amount.sum")
+    .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+    .scan("transactions")
+    .window(Tumble over 1.hour on 'timestamp as 'w)
+    .groupBy('accountId, 'w)
+    .select('accountId, 'w.start as 'timestamp, 'amount.sum)
+    .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+This defines your application as using one hour tumbling windows based on the timestamp column.
+So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 01:00:00` window.
+
+Aggregations based on time are unique because time, as opposed to other attributes, generally moves forward in a continuous streaming application.
+In a batch context, windows offer a convenient API for grouping records by a timestamp attribute.
+
+Running the updated query will produce identical results as before.
+
+{% highlight raw %}
+# Query 2 output showing account id, timestamp, and amount
+
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+> 1, 2019-01-01 02:00:00.0, $859.35
+> 2, 2019-01-01 02:00:00.0, $458.40
+> 1, 2019-01-01 03:00:00.0, $330.85
+> 2, 2019-01-01 03:00:00.0, $730.02
+> 1, 2019-01-01 04:00:00.0, $585.16
+> 2, 2019-01-01 04:00:00.0, $760.76
+{% endhighlight %}
+
+## Once More, With Streaming!
+
+Because Flink's Table API offers consistent syntax and semantics for both batch and streaming, migrating from one to the other requires just two steps.
+
+The first step is to replace the batch `ExecutionEnvironment` with its streaming counterpart, `StreamExecutionEnvironment`, which creates a continuous streaming Job.
+It includes stream-specific configurations, such as the time characteristic, which when set to [event time]({{ site.baseurl }}/dev/event_time.html) guarantees consistent results even when faced with out-of-order events or a Job failure.
+This is what will be used by your `Tumble` window when grouping records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val tEnv = StreamTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+</div>
+
+The second step is to migrate from a bounded data source to an infinite data source.
+The project comes with an `UnboundedTransactionTableSource` that continuously creates transaction events in real-time.
+Similar to the `BoundedTransactionTableSource` this table is backed by data generated in memory to avoid any dependencies on external systems.
+In practice, this table might read from a streaming source such as Apache Kafka, AWS Kinesis, or Pravega.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+The query continuously consumes the stream of transactions, computes the hourly spendings, and emits results as soon as they are ready.
+Since the input is unbounded, the query keeps running until it is manually stopped.
+And because the Job uses time window-based aggregations, Flink can perform specific optimizations such as state clean up when the framework knows that no more records will arrive for a particular window.
+
+{% highlight raw %}
+# Query 3 output showing account id, timestamp, and amount
+
+# These rows are calculated continuously over the hour 
+# and output immediately at the end of the hour
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+
+# Flink begins computing these rows as soon as 
+# as the first record for the window arrives
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+
+{% endhighlight %}
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
+import org.apache.flink.walkthrough.common.table.TransactionTableSource;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+public class SpendReport {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
+        tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+
+        tEnv
+            .scan("transactions")
+            .window(Tumble.over("1.hour").on("timestamp").as("w"))
+            .groupBy("accountId, w")
+            .select("accountId, w.start as timestamp, amount.sum")
+            .insertInto("spend_report");
+
+        env.execute("Spend Report");
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.api.Tumble
+import org.apache.flink.table.api.scala._
+import org.apache.flink.walkthrough.common.table._
+
+object SpendReport {
+
+    def main(args: Array[String]): Unit = {
+        val env = StreamExecutionEnvironment.getExecutionEnvironment
+        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+        val tEnv = StreamTableEnvironment.create(env)
+
+        tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+        tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+        tEnv
+            .scan("transactions")
+            .window(Tumble over 1.hour on 'timestamp as 'w)
+            .groupBy('accountId, 'w)
+            .select('accountId, 'w.start as 'timestamp, 'amount.sum)
+            .insertInto("spend_report")
+
+        env.execute("Spend Report")
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
diff --git a/docs/getting-started/walkthroughs/table_api.zh.md b/docs/getting-started/walkthroughs/table_api.zh.md
new file mode 100644
index 0000000..878fb54
--- /dev/null
+++ b/docs/getting-started/walkthroughs/table_api.zh.md
@@ -0,0 +1,494 @@
+---
+title: "Table API"
+nav-id: tableapiwalkthrough
+nav-title: 'Table API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, batch data sets and produce the same results.
+The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Will You Be Building? 
+
+In this tutorial, you will learn how to build a continuous ETL pipeline for tracking financial transactions by account over time.
+You will start by building your report as a nightly batch job, and then migrate to a streaming pipeline.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. 
+
+## How To Follow Along
+
+If you want to follow along, you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=spend-report \
+    -DartifactId=spend-report \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=spend-report \
+    -DartifactId=spend-report \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven's official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+    .scan("transactions")
+    .insertInto("spend_report");
+
+env.execute("Spend Report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env  = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = BatchTableEnvironment.create(env)
+
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+val truncateDateToHour = new TruncateDateToHour
+
+tEnv
+    .scan("transactions")
+    .insertInto("spend_report")
+
+env.execute("Spend Report")
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down The Code
+
+#### The Execution Environment
+
+The first two lines set up your `ExecutionEnvironment`.
+The execution environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources.
+This walkthrough begins with the batch environment since you are building a periodic batch report.
+It is then wrapped in a `BatchTableEnvironment` to have full access to the Table API.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env  = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = BatchTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Registering Tables
+
+Next, tables are registered in the execution environment that you can use to connect to external systems for reading and writing both batch and streaming data.
+A table source provides access to data stored in external systems; such as a database, a key-value store, a message queue, or a file system.
+A table sink emits a table to an external storage system.
+Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+{% endhighlight %}
+</div>
+</div>
+
+Two tables are registered; a transaction input table, and a spend report output table.
+The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ amounts (`amount`).
+In this tutorial, the table is backed by data generated in memory to avoid any dependencies on external systems.
+In practice, the `BoundedTransactionTableSource` may be backed by a filesystem, a database, or any other static source.
+The spend report (`spend_report`) table logs each row with log level **INFO**, instead of writing to persistent storage, so you can easily see your results.
+
+#### Registering A UDF
+
+Along with the tables, a [user-defined function]({{ site.baseurl }}/dev/table/udfs.html) is registered for working with timestamps.
+This function takes a timestamp and rounds it down to the nearest hour.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val truncateDateToHour = new TruncateDateToHour
+{% endhighlight %}
+</div>
+</div>
+
+#### The Query
+
+With the environment configured and tables registered, you are ready to build your first application.
+From the `TableEnvironment` you can `scan` an input table to read its rows and then write those results into an output table using `insertInto`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+    .scan("transactions")
+    .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+    .scan("transactions")
+    .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+Initially, the Job reads all transactions and logs them out with log level **INFO**.
+
+#### Execute
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+You call `ExecutionEnvironment#execute` to begin the execution of your Job by giving it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Spend Report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Spend Report")
+{% endhighlight %}
+</div>
+</div>
+
+## Attempt One
+
+Now with the skeleton of a Job set-up, you are ready to add some business logic.
+The goal is to build a report that shows the total spend for each account across each hour of the day.
+Just like a SQL query, Flink can select the required fields and group by your keys.
+Because the timestamp field has millisecond granularity, you can use the UDF to round it down to the nearest hour.
+Finally, select all the fields, summing the total spend per account-hour pair with the built-in `sum` [aggregate function]({{ site.baseurl }}/dev/table/functions.html#aggregate-functions).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+    .scan("transactions")
+    .select("accountId, timestamp.truncateDateToHour as timestamp, amount")
+    .groupBy("accountId, timestamp")
+    .select("accountId, timestamp, amount.sum as total")
+    .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+    .scan("transactions")
+    .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount)
+    .groupBy('accountId, 'timestamp)
+    .select('accountId, 'timestamp, 'amount.sum as 'total)
+    .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner.
+
+{% highlight raw %}
+# Query 1 output showing account id, timestamp, and amount
+
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+> 1, 2019-01-01 02:00:00.0, $859.35
+> 2, 2019-01-01 02:00:00.0, $458.40
+> 1, 2019-01-01 03:00:00.0, $330.85
+> 2, 2019-01-01 03:00:00.0, $730.02
+> 1, 2019-01-01 04:00:00.0, $585.16
+> 2, 2019-01-01 04:00:00.0, $760.76
+{% endhighlight %}
+
+## Adding Windows
+
+Grouping data based on time is a typical operation in data processing, especially when working with infinite streams.
+A grouping based on time is called a [window]({{ site.baseurl }} /dev/stream/operators/windows.html) and Flink offers flexible windowing semantics.
+The most basic type of window is called a `Tumble` window, which has a fixed size and whose buckets do not overlap.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+    .scan("transactions")
+    .window(Tumble.over("1.hour").on("timestamp").as("w"))
+    .groupBy("accountId, w")
+    .select("accountId, w.start as timestamp, amount.sum")
+    .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+    .scan("transactions")
+    .window(Tumble over 1.hour on 'timestamp as 'w)
+    .groupBy('accountId, 'w)
+    .select('accountId, 'w.start as 'timestamp, 'amount.sum)
+    .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+This defines your application as using one hour tumbling windows based on the timestamp column.
+So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 01:00:00` window.
+
+Aggregations based on time are unique because time, as opposed to other attributes, generally moves forward in a continuous streaming application.
+In a batch context, windows offer a convenient API for grouping records by a timestamp attribute.
+
+Running the updated query will produce identical results as before.
+
+{% highlight raw %}
+# Query 2 output showing account id, timestamp, and amount
+
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+> 1, 2019-01-01 02:00:00.0, $859.35
+> 2, 2019-01-01 02:00:00.0, $458.40
+> 1, 2019-01-01 03:00:00.0, $330.85
+> 2, 2019-01-01 03:00:00.0, $730.02
+> 1, 2019-01-01 04:00:00.0, $585.16
+> 2, 2019-01-01 04:00:00.0, $760.76
+{% endhighlight %}
+
+## Once More, With Streaming!
+
+Because Flink's Table API offers consistent syntax and semantics for both batch and streaming, migrating from one to the other requires just two steps.
+
+The first step is to replace the batch `ExecutionEnvironment` with its streaming counterpart, `StreamExecutionEnvironment`, which creates a continuous streaming Job.
+It includes stream-specific configurations, such as the time characteristic, which when set to [event time]({{ site.baseurl }}/dev/event_time.html) guarantees consistent results even when faced with out-of-order events or a Job failure.
+This is what will be used by your `Tumble` window when grouping records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val tEnv = StreamTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+</div>
+
+The second step is to migrate from a bounded data source to an infinite data source.
+The project comes with an `UnboundedTransactionTableSource` that continuously creates transaction events in real-time.
+Similar to the `BoundedTransactionTableSource` this table is backed by data generated in memory to avoid any dependencies on external systems.
+In practice, this table might read from a streaming source such as Apache Kafka, AWS Kinesis, or Pravega.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+The query continuously consumes the stream of transactions, computes the hourly spendings, and emits results as soon as they are ready.
+Since the input is unbounded, the query keeps running until it is manually stopped.
+And because the Job uses time window-based aggregations, Flink can perform specific optimizations such as state clean up when the framework knows that no more records will arrive for a particular window.
+
+{% highlight raw %}
+# Query 3 output showing account id, timestamp, and amount
+
+# These rows are calculated continuously over the hour 
+# and output immediately at the end of the hour
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+
+# Flink begins computing these rows as soon as 
+# as the first record for the window arrives
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+
+{% endhighlight %}
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
+import org.apache.flink.walkthrough.common.table.TransactionTableSource;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+public class SpendReport {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
+        tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+
+        tEnv
+            .scan("transactions")
+            .window(Tumble.over("1.hour").on("timestamp").as("w"))
+            .groupBy("accountId, w")
+            .select("accountId, w.start as timestamp, amount.sum")
+            .insertInto("spend_report");
+
+        env.execute("Spend Report");
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.api.Tumble
+import org.apache.flink.table.api.scala._
+import org.apache.flink.walkthrough.common.table._
+
+object SpendReport {
+
+    def main(args: Array[String]): Unit = {
+        val env = StreamExecutionEnvironment.getExecutionEnvironment
+        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+        val tEnv = StreamTableEnvironment.create(env)
+
+        tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+        tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+        tEnv
+            .scan("transactions")
+            .window(Tumble over 1.hour on 'timestamp as 'w)
+            .groupBy('accountId, 'w)
+            .select('accountId, 'w.start as 'timestamp, 'amount.sum)
+            .insertInto("spend_report")
+
+        env.execute("Spend Report")
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index b2578fc..653803a 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -136,6 +136,9 @@ run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scr
 run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
 run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
 
+run_test "Walkthrough Table Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh java"
+run_test "Walkthrough Table Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh scala"
+
 run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_gcp_pubsub.sh"
 
 run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
new file mode 100755
index 0000000..77afc58
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
@@ -0,0 +1,74 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# End to end test for quick starts test.
+# Usage:
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh <Type (java or scala)>
+
+source "$(dirname "$0")"/common.sh
+
+TEST_TYPE=$1
+
+mkdir -p "${TEST_DATA_DIR}"
+cd "${TEST_DATA_DIR}"
+
+ARTIFACT_ID=flink-walkthrough-table-${TEST_TYPE}
+ARTIFACT_VERSION=0.1
+
+mvn archetype:generate                                          \
+    -DarchetypeGroupId=org.apache.flink                         \
+    -DarchetypeArtifactId=flink-walkthrough-table-${TEST_TYPE}  \
+    -DarchetypeVersion=${FLINK_VERSION}                         \
+    -DgroupId=org.apache.flink.walkthrough                      \
+    -DartifactId=${ARTIFACT_ID}                                 \
+    -Dversion=${ARTIFACT_VERSION}                               \
+    -Dpackage=org.apache.flink.walkthrough                      \
+    -DinteractiveMode=false
+
+cd "${ARTIFACT_ID}"
+
+mvn clean package -nsu > compile-output.txt
+
+if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
+    echo "Failure: The walk-through did not successfully compile"
+    exit 1
+fi
+
+cd target
+jar tvf ${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then
+
+    echo "Success: There are no flink core classes are contained in the jar."
+else
+    echo "Failure: There are flink core classes are contained in the jar."
+    exit 1
+fi
+
+TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar
+
+add_optional_lib "table"
+
+start_cluster
+
+${FLINK_DIR}/bin/flink run "$TEST_PROGRAM_JAR"
diff --git a/flink-walkthroughs/flink-walkthrough-common/pom.xml b/flink-walkthroughs/flink-walkthrough-common/pom.xml
new file mode 100644
index 0000000..714d344
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-walkthroughs</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+	<name>flink-walkthrough-common</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java
new file mode 100644
index 0000000..606d388
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.entity;
+
+import java.util.Objects;
+
+/**
+ * A simple transaction.
+ */
+@SuppressWarnings("unused")
+public final class Transaction {
+
+	private long accountId;
+
+	private long timestamp;
+
+	private double amount;
+
+	public Transaction() { }
+
+	public Transaction(long accountId, long timestamp, double amount) {
+		this.accountId = accountId;
+		this.timestamp = timestamp;
+		this.amount = amount;
+	}
+
+	public long getAccountId() {
+		return accountId;
+	}
+
+	public void setAccountId(long accountId) {
+		this.accountId = accountId;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	public double getAmount() {
+		return amount;
+	}
+
+	public void setAmount(double amount) {
+		this.amount = amount;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		Transaction that = (Transaction) o;
+		return accountId == that.accountId &&
+			timestamp == that.timestamp &&
+			Double.compare(that.amount, amount) == 0;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(accountId, timestamp, amount);
+	}
+
+	@Override
+	public String toString() {
+		return "Transaction{" +
+			"accountId=" + accountId +
+			", timestamp=" + timestamp +
+			", amount=" + amount +
+			'}';
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java
new file mode 100644
index 0000000..fee3a82
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic output format that logs all records at level <b>INFO</b>.
+ */
+@Internal
+public class LoggerOutputFormat implements OutputFormat<String> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(LoggerOutputFormat.class);
+
+	@Override
+	public void configure(Configuration parameters) { }
+
+	@Override
+	public void open(int taskNumber, int numTasks) { }
+
+	@Override
+	public void writeRecord(String record) {
+		LOG.info(record);
+	}
+
+	@Override
+	public void close() { }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java
new file mode 100644
index 0000000..0f87f45
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.source;
+
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * An iterator of transaction events.
+ */
+final class TransactionIterator implements Iterator<Transaction>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Timestamp INITIAL_TIMESTAMP = Timestamp.valueOf("2019-01-01 00:00:00");
+
+	private static final long SIX_MINUTES = 6 * 60 * 1000;
+
+	private final boolean bounded;
+
+	private int index = 0;
+
+	private long timestamp;
+
+	static TransactionIterator bounded() {
+		return new TransactionIterator(true);
+	}
+
+	static TransactionIterator unbounded() {
+		return new TransactionIterator(false);
+	}
+
+	private TransactionIterator(boolean bounded) {
+		this.bounded = bounded;
+		this.timestamp = INITIAL_TIMESTAMP.getTime();
+	}
+
+	@Override
+	public boolean hasNext() {
+		if (index < data.size()) {
+			return true;
+		} else if (!bounded) {
+			index = 0;
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public Transaction next() {
+		Transaction transaction = data.get(index++);
+		transaction.setTimestamp(timestamp);
+		timestamp += SIX_MINUTES;
+		return transaction;
+	}
+
+	private static List<Transaction> data = Arrays.asList(
+		new Transaction(1, 0L, 188.23),
+		new Transaction(2, 0L, 374.79),
+		new Transaction(3, 0L, 112.15),
+		new Transaction(4, 0L, 478.75),
+		new Transaction(5, 0L, 208.85),
+		new Transaction(1, 0L, 379.64),
+		new Transaction(2, 0L, 351.44),
+		new Transaction(3, 0L, 320.75),
+		new Transaction(4, 0L, 259.42),
+		new Transaction(5, 0L, 273.44),
+		new Transaction(1, 0L, 267.25),
+		new Transaction(2, 0L, 397.15),
+		new Transaction(3, 0L, 0.219),
+		new Transaction(4, 0L, 231.94),
+		new Transaction(5, 0L, 384.73),
+		new Transaction(1, 0L, 419.62),
+		new Transaction(2, 0L, 412.91),
+		new Transaction(3, 0L, 0.77),
+		new Transaction(4, 0L, 22.10),
+		new Transaction(5, 0L, 377.54),
+		new Transaction(1, 0L, 375.44),
+		new Transaction(2, 0L, 230.18),
+		new Transaction(3, 0L, 0.80),
+		new Transaction(4, 0L, 350.89),
+		new Transaction(5, 0L, 127.55),
+		new Transaction(1, 0L, 483.91),
+		new Transaction(2, 0L, 228.22),
+		new Transaction(3, 0L, 871.15),
+		new Transaction(4, 0L, 64.19),
+		new Transaction(5, 0L, 79.43),
+		new Transaction(1, 0L, 56.12),
+		new Transaction(2, 0L, 256.48),
+		new Transaction(3, 0L, 148.16),
+		new Transaction(4, 0L, 199.95),
+		new Transaction(5, 0L, 252.37),
+		new Transaction(1, 0L, 274.73),
+		new Transaction(2, 0L, 473.54),
+		new Transaction(3, 0L, 119.92),
+		new Transaction(4, 0L, 323.59),
+		new Transaction(5, 0L, 353.16),
+		new Transaction(1, 0L, 211.90),
+		new Transaction(2, 0L, 280.93),
+		new Transaction(3, 0L, 347.89),
+		new Transaction(4, 0L, 459.86),
+		new Transaction(5, 0L, 82.31),
+		new Transaction(1, 0L, 373.26),
+		new Transaction(2, 0L, 479.83),
+		new Transaction(3, 0L, 454.25),
+		new Transaction(4, 0L, 83.64),
+		new Transaction(5, 0L, 292.44));
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java
new file mode 100644
index 0000000..5acb17e
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.types.Row;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+import java.sql.Timestamp;
+import java.util.Iterator;
+
+/**
+ * An bounded input of transactions.
+ */
+@Internal
+public class TransactionRowInputFormat extends GenericInputFormat<Row> implements NonParallelInput {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Iterator<Transaction> transactions;
+
+	@Override
+	public void open(GenericInputSplit split) {
+		transactions = TransactionIterator.bounded();
+	}
+
+	@Override
+	public boolean reachedEnd() {
+		return !transactions.hasNext();
+	}
+
+	@Override
+	public Row nextRecord(Row reuse) {
+		Transaction transaction = transactions.next();
+		reuse.setField(0, transaction.getAccountId());
+		reuse.setField(1, new Timestamp(transaction.getTimestamp()));
+		reuse.setField(2, transaction.getAmount());
+
+		return reuse;
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java
new file mode 100644
index 0000000..d882b1c
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.source;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * A stream of transactions.
+ */
+@Public
+public class TransactionSource extends FromIteratorFunction<Transaction> {
+
+	private static final long serialVersionUID = 1L;
+
+	public TransactionSource() {
+		super(new RateLimitedIterator<>(TransactionIterator.unbounded()));
+	}
+
+	private static class RateLimitedIterator<T> implements Iterator<T>, Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Iterator<T> inner;
+
+		private RateLimitedIterator(Iterator<T> inner) {
+			this.inner = inner;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return inner.hasNext();
+		}
+
+		@Override
+		public T next() {
+			try {
+				Thread.sleep(100);
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e);
+			}
+			return inner.next();
+		}
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java
new file mode 100644
index 0000000..e51a341
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.InputFormatTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.walkthrough.common.source.TransactionRowInputFormat;
+
+/**
+ * A table source for reading a bounded set of transaction events.
+ *
+ * <p>This could be backed by a table, database, or other static data set.
+ */
+@PublicEvolving
+@SuppressWarnings({"deprecation", "unused"})
+public class BoundedTransactionTableSource extends InputFormatTableSource<Row> {
+	@Override
+	public InputFormat<Row, ?> getInputFormat() {
+		return new TransactionRowInputFormat();
+	}
+
+	@Override
+	public DataType getProducedDataType() {
+		return getTableSchema().toRowDataType();
+	}
+
+	@Override
+	public TableSchema getTableSchema() {
+		return TableSchema.builder()
+			.field("accountId", Types.LONG)
+			.field("timestamp", Types.SQL_TIMESTAMP)
+			.field("amount", Types.DOUBLE)
+			.build();
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
new file mode 100644
index 0000000..6c85717
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.walkthrough.common.sink.LoggerOutputFormat;
+
+/**
+ * A simple table sink for writing to stdout.
+ */
+@PublicEvolving
+@SuppressWarnings("deprecation")
+public class SpendReportTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
+
+	private final TableSchema schema;
+
+	public SpendReportTableSink() {
+		this.schema = TableSchema
+			.builder()
+			.field("accountId", Types.LONG)
+			.field("timestamp", Types.SQL_TIMESTAMP)
+			.field("amount", Types.DOUBLE)
+			.build();
+	}
+
+	@Override
+	public void emitDataSet(DataSet<Row> dataSet) {
+		dataSet
+			.map(SpendReportTableSink::format)
+			.output(new LoggerOutputFormat());
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		dataStream
+			.map(SpendReportTableSink::format)
+			.writeUsingOutputFormat(new LoggerOutputFormat());
+	}
+
+	@Override
+	public TableSchema getTableSchema() {
+		return schema;
+	}
+
+	@Override
+	public DataType getConsumedDataType() {
+		return getTableSchema().toRowDataType();
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return getTableSchema().getFieldNames();
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return getTableSchema().getFieldTypes();
+	}
+
+	@Override
+	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		return this;
+	}
+
+	private static String format(Row row) {
+		//noinspection MalformedFormatString
+		return String.format("%s, %s, $%.2f", row.getField(0), row.getField(1), row.getField(2));
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java
new file mode 100644
index 0000000..d9f35b9
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * A user defined function for rounding timestamps down to
+ * the nearest hour.
+ */
+@PublicEvolving
+@SuppressWarnings("unused")
+public class TruncateDateToHour extends ScalarFunction {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final long ONE_HOUR = 60 * 60 * 1000;
+
+	public long eval(long timestamp) {
+		return timestamp - (timestamp % ONE_HOUR);
+	}
+
+	@Override
+	public TypeInformation<?> getResultType(Class<?>[] signature) {
+		return Types.SQL_TIMESTAMP;
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java
new file mode 100644
index 0000000..f6114da
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.DefinedRowtimeAttributes;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A table source for reading an unbounded set of transactions.
+ *
+ * <p>This table could be backed by a message queue or other streaming data source.
+ */
+@PublicEvolving
+@SuppressWarnings({"deprecation", "unused"})
+public class UnboundedTransactionTableSource
+		implements StreamTableSource<Row>,
+		DefinedRowtimeAttributes {
+
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+		return execEnv
+			.addSource(new TransactionSource())
+			.map(transactionRowMapFunction())
+			.returns(getTableSchema().toRowType());
+	}
+
+	private MapFunction<Transaction, Row> transactionRowMapFunction() {
+		return transaction -> Row.of(
+			transaction.getAccountId(),
+			new Timestamp(transaction.getTimestamp()),
+			transaction.getAmount());
+	}
+
+	@Override
+	public DataType getProducedDataType() {
+		return getTableSchema().toRowDataType();
+	}
+
+	@Override
+	public TableSchema getTableSchema() {
+		return TableSchema.builder()
+			.field("accountId", Types.LONG)
+			.field("timestamp", Types.SQL_TIMESTAMP)
+			.field("amount", Types.DOUBLE)
+			.build();
+	}
+
+	@Override
+	public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
+		return Collections.singletonList(
+			new RowtimeAttributeDescriptor(
+				"timestamp",
+				new ExistingField("timestamp"),
+				new BoundedOutOfOrderTimestamps(100)));
+	}
+}
diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-java/pom.xml
similarity index 54%
copy from docs/getting-started/tutorials/index.md
copy to flink-walkthroughs/flink-walkthrough-table-java/pom.xml
index 9b0c8f4..8834701 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/flink-walkthroughs/flink-walkthrough-table-java/pom.xml
@@ -1,10 +1,3 @@
----
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
-nav-parent_id: getting-started
-nav-pos: 1
----
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -23,3 +16,22 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-walkthroughs</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-walkthrough-table-java</artifactId>
+	<packaging>maven-archetype</packaging>
+
+</project>
diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml
similarity index 52%
copy from docs/getting-started/tutorials/index.md
copy to flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml
index 9b0c8f4..c0e8806 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -1,10 +1,3 @@
----
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
-nav-parent_id: getting-started
-nav-pos: 1
----
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -23,3 +16,21 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
+
+<archetype-descriptor
+	xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+	name="flink-walkthrough-table-java">
+	<fileSets>
+		<fileSet filtered="true" packaged="true" encoding="UTF-8">
+			<directory>src/main/java</directory>
+			<includes>
+				<include>**/*.java</include>
+			</includes>
+		</fileSet>
+		<fileSet encoding="UTF-8">
+			<directory>src/main/resources</directory>
+		</fileSet>
+	</fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..81fdd66
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,263 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>${groupId}</groupId>
+	<artifactId>${artifactId}</artifactId>
+	<version>${version}</version>
+	<packaging>jar</packaging>
+
+	<name>Flink Walkthrough Table Java</name>
+	<url>https://flink.apache.org</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>@project.version@</flink.version>
+		<java.version>1.8</java.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<maven.compiler.source>${java.version}</maven.compiler.source>
+		<maven.compiler.target>${java.version}</maven.compiler.target>
+	</properties>
+
+	<repositories>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Development Snapshot Repository</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+			<releases>
+				<enabled>false</enabled>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Table ecosystem -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+		<!-- Example:
+
+		<dependency>
+		    <groupId>org.apache.flink</groupId>
+		    <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+		    <version>${flink.version}</version>
+		</dependency>
+		-->
+
+		<!-- Add logging framework, to produce console output when running in the IDE. -->
+		<!-- These dependencies are excluded from the application JAR by default. -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.7.7</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.17</version>
+			<scope>runtime</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+
+			<!-- Java Compiler -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>${java.version}</source>
+					<target>${java.version}</target>
+				</configuration>
+			</plugin>
+
+			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>org.apache.flink:force-shading</exclude>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>${package}.SpendReport</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+
+				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-shade-plugin</artifactId>
+										<versionRange>[3.0.0,)</versionRange>
+										<goals>
+											<goal>shade</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-compiler-plugin</artifactId>
+										<versionRange>[3.1,)</versionRange>
+										<goals>
+											<goal>testCompile</goal>
+											<goal>compile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
+	<!-- This profile helps to make things run out of the box in IntelliJ -->
+	<!-- Its adds Flink's core classes to the runtime class path. -->
+	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+	<profiles>
+		<profile>
+			<id>add-dependencies-for-IDEA</id>
+
+			<activation>
+				<property>
+					<name>idea.version</name>
+				</property>
+			</activation>
+
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-java</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java
new file mode 100644
index 0000000..a3911ed
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
+import org.apache.flink.walkthrough.common.table.BoundedTransactionTableSource;
+import org.apache.flink.walkthrough.common.table.TruncateDateToHour;
+
+/**
+ * Skeleton code for the table walkthrough
+ */
+public class SpendReport {
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+		tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+		tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+		tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+		tEnv
+			.scan("transactions")
+			.insertInto("spend_report");
+
+		env.execute("Spend Report");
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8deec36
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.LoggerOutputFormat=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-scala/pom.xml
similarity index 54%
copy from docs/getting-started/tutorials/index.md
copy to flink-walkthroughs/flink-walkthrough-table-scala/pom.xml
index 9b0c8f4..d67a59c 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/pom.xml
@@ -1,10 +1,3 @@
----
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
-nav-parent_id: getting-started
-nav-pos: 1
----
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -23,3 +16,22 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-walkthroughs</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-walkthrough-table-scala</artifactId>
+	<packaging>maven-archetype</packaging>
+
+</project>
diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
similarity index 52%
copy from docs/getting-started/tutorials/index.md
copy to flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
index 9b0c8f4..977e5ca 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -1,10 +1,3 @@
----
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
-nav-parent_id: getting-started
-nav-pos: 1
----
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -23,3 +16,21 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
+
+<archetype-descriptor
+	xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+	name="flink-walkthrough-table-scala">
+	<fileSets>
+		<fileSet filtered="true" packaged="true" encoding="UTF-8">
+			<directory>src/main/scala</directory>
+			<includes>
+				<include>**/*.scala</include>
+			</includes>
+		</fileSet>
+		<fileSet encoding="UTF-8">
+			<directory>src/main/resources</directory>
+		</fileSet>
+	</fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..ca9a181
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,300 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>${groupId}</groupId>
+	<artifactId>${artifactId}</artifactId>
+	<version>${version}</version>
+	<packaging>jar</packaging>
+
+	<name>Flink Walkthrough Table Scala</name>
+	<url>https://flink.apache.org</url>
+
+	<repositories>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Development Snapshot Repository</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+			<releases>
+				<enabled>false</enabled>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>@project.version@</flink.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<scala.version>2.11.12</scala.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<!-- Apache Flink dependencies -->
+		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Table ecosystem -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+		<!-- Example:
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		-->
+
+		<!-- Add logging framework, to produce console output when running in the IDE. -->
+		<!-- These dependencies are excluded from the application JAR by default. -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.7.7</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.17</version>
+			<scope>runtime</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>org.apache.flink:force-shading</exclude>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<!-- Do not copy the signatures in the META-INF folder.
+									Otherwise, this might cause SecurityExceptions when using the JAR. -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>${package}.SpendReport</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Java Compiler -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>1.8</source>
+					<target>1.8</target>
+				</configuration>
+			</plugin>
+
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.2.2</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>compile</goal>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Eclipse Scala Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+	<!-- This profile helps to make things run out of the box in IntelliJ -->
+	<!-- Its adds Flink's core classes to the runtime class path. -->
+	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+	<profiles>
+		<profile>
+			<id>add-dependencies-for-IDEA</id>
+
+			<activation>
+				<property>
+					<name>idea.version</name>
+				</property>
+			</activation>
+
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-scala_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8deec36
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.LoggerOutputFormat=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala
new file mode 100644
index 0000000..47657b8
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.walkthrough.common.table._
+
+object SpendReport {
+  def main(args: Array[String]): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = BatchTableEnvironment.create(env)
+
+    tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+    tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+    val truncateDateToHour = new TruncateDateToHour
+
+    tEnv
+      .scan("transactions")
+      .insertInto("spend_report")
+
+    env.execute("Spend Report")
+  }
+}
diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml
new file mode 100644
index 0000000..2733f59
--- /dev/null
+++ b/flink-walkthroughs/pom.xml
@@ -0,0 +1,95 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-walkthroughs</artifactId>
+	<packaging>pom</packaging>
+
+	<name>flink-walkthroughs</name>
+
+	<modules>
+		<module>flink-walkthrough-common</module>
+		<module>flink-walkthrough-table-java</module>
+		<module>flink-walkthrough-table-scala</module>
+	</modules>
+	<build>
+		<extensions>
+			<extension>
+				<groupId>org.apache.maven.archetype</groupId>
+				<artifactId>archetype-packaging</artifactId>
+				<version>2.2</version>
+			</extension>
+		</extensions>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-archetype-plugin</artifactId>
+					<version>2.2</version>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+		<plugins>
+			<plugin>
+				<artifactId>maven-archetype-plugin</artifactId>
+				<version>2.2</version><!--$NO-MVN-MAN-VER$-->
+				<configuration>
+					<skip>true</skip>
+				</configuration>
+			</plugin>
+			<!-- deactivate the shade plugin for the walkthrough archetypes -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase/>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- use alternative delimiter for filtering resources -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-resources-plugin</artifactId>
+				<configuration>
+					<useDefaultDelimiters>false</useDefaultDelimiters>
+					<delimiters>
+						<delimiter>@</delimiter>
+					</delimiters>
+				</configuration>
+			</plugin>
+		</plugins>
+		<resources>
+			<resource>
+				<directory>src/main/resources</directory>
+				<filtering>true</filtering>
+			</resource>
+		</resources>
+	</build>
+</project>
diff --git a/pom.xml b/pom.xml
index 6d77568..9300fef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@ under the License.
 		<module>flink-docs</module>
 		<module>flink-python</module>
 		<module>flink-ml-parent</module>
+		<module>flink-walkthroughs</module>
 	</modules>
 
 	<properties>