You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/07/29 20:47:11 UTC
[flink] branch master updated: [FLINK-12747][docs] Getting Started
- Table Api Walkthrough
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f4943dd [FLINK-12747][docs] Getting Started - Table Api Walkthrough
f4943dd is described below
commit f4943dd06b2ff7da899812a2aaa2e0b24c2afc01
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Wed Jun 26 14:48:38 2019 -0500
[FLINK-12747][docs] Getting Started - Table Api Walkthrough
---
docs/getting-started/examples/index.md | 2 +-
docs/getting-started/examples/index.zh.md | 2 +-
docs/getting-started/tutorials/index.md | 2 +-
docs/getting-started/tutorials/index.zh.md | 2 +-
.../{tutorials => walkthroughs}/index.md | 8 +-
.../{tutorials => walkthroughs}/index.zh.md | 8 +-
docs/getting-started/walkthroughs/table_api.md | 494 +++++++++++++++++++++
docs/getting-started/walkthroughs/table_api.zh.md | 494 +++++++++++++++++++++
flink-end-to-end-tests/run-nightly-tests.sh | 3 +
.../test-scripts/test_table_walkthroughs.sh | 74 +++
.../flink-walkthrough-common/pom.xml | 59 +++
.../walkthrough/common/entity/Transaction.java | 93 ++++
.../common/sink/LoggerOutputFormat.java | 50 +++
.../common/source/TransactionIterator.java | 130 ++++++
.../common/source/TransactionRowInputFormat.java | 60 +++
.../common/source/TransactionSource.java | 65 +++
.../table/BoundedTransactionTableSource.java | 55 +++
.../common/table/SpendReportTableSink.java | 95 ++++
.../common/table/TruncateDateToHour.java | 46 ++
.../table/UnboundedTransactionTableSource.java | 89 ++++
.../flink-walkthrough-table-java/pom.xml | 26 +-
.../META-INF/maven/archetype-metadata.xml | 25 +-
.../src/main/resources/archetype-resources/pom.xml | 263 +++++++++++
.../src/main/java/SpendReport.java | 45 ++
.../src/main/resources/log4j.properties | 24 +
.../flink-walkthrough-table-scala/pom.xml | 26 +-
.../META-INF/maven/archetype-metadata.xml | 25 +-
.../src/main/resources/archetype-resources/pom.xml | 300 +++++++++++++
.../src/main/resources/log4j.properties | 24 +
.../src/main/scala/SpendReport.scala | 41 ++
flink-walkthroughs/pom.xml | 95 ++++
pom.xml | 1 +
32 files changed, 2686 insertions(+), 40 deletions(-)
diff --git a/docs/getting-started/examples/index.md b/docs/getting-started/examples/index.md
index 6810bbd..d4d315c 100644
--- a/docs/getting-started/examples/index.md
+++ b/docs/getting-started/examples/index.md
@@ -3,7 +3,7 @@ title: Examples
nav-id: examples
nav-title: '<i class="fa fa-file-code-o title appetizer" aria-hidden="true"></i> Examples'
nav-parent_id: getting-started
-nav-pos: 2
+nav-pos: 3
nav-show_overview: true
---
<!--
diff --git a/docs/getting-started/examples/index.zh.md b/docs/getting-started/examples/index.zh.md
index 3bca74a..e0925d8 100644
--- a/docs/getting-started/examples/index.zh.md
+++ b/docs/getting-started/examples/index.zh.md
@@ -3,7 +3,7 @@ title: 示例
nav-id: examples
nav-title: '<i class="fa fa-file-code-o title appetizer" aria-hidden="true"></i> 示例'
nav-parent_id: getting-started
-nav-pos: 2
+nav-pos: 3
nav-show_overview: true
---
<!--
diff --git a/docs/getting-started/tutorials/index.md b/docs/getting-started/tutorials/index.md
index 9b0c8f4..f283489 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/docs/getting-started/tutorials/index.md
@@ -3,7 +3,7 @@ title: "Tutorials"
nav-id: tutorials
nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
nav-parent_id: getting-started
-nav-pos: 1
+nav-pos: 2
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/tutorials/index.zh.md b/docs/getting-started/tutorials/index.zh.md
index 540a6c4..ac6715b 100644
--- a/docs/getting-started/tutorials/index.zh.md
+++ b/docs/getting-started/tutorials/index.zh.md
@@ -3,7 +3,7 @@ title: "教程"
nav-id: tutorials
nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> 教程'
nav-parent_id: getting-started
-nav-pos: 1
+nav-pos: 2
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/tutorials/index.md b/docs/getting-started/walkthroughs/index.md
similarity index 84%
copy from docs/getting-started/tutorials/index.md
copy to docs/getting-started/walkthroughs/index.md
index 9b0c8f4..ae2f536 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/docs/getting-started/walkthroughs/index.md
@@ -1,7 +1,7 @@
---
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
+title: "Code Walkthroughs"
+nav-id: walkthroughs
+nav-title: '<i class="fa fa-bolt title appetizer" aria-hidden="true"></i> Code Walkthroughs'
nav-parent_id: getting-started
nav-pos: 1
---
@@ -22,4 +22,4 @@ software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--->
+-->
\ No newline at end of file
diff --git a/docs/getting-started/tutorials/index.zh.md b/docs/getting-started/walkthroughs/index.zh.md
similarity index 84%
copy from docs/getting-started/tutorials/index.zh.md
copy to docs/getting-started/walkthroughs/index.zh.md
index 540a6c4..ae2f536 100644
--- a/docs/getting-started/tutorials/index.zh.md
+++ b/docs/getting-started/walkthroughs/index.zh.md
@@ -1,7 +1,7 @@
---
-title: "教程"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> 教程'
+title: "Code Walkthroughs"
+nav-id: walkthroughs
+nav-title: '<i class="fa fa-bolt title appetizer" aria-hidden="true"></i> Code Walkthroughs'
nav-parent_id: getting-started
nav-pos: 1
---
@@ -22,4 +22,4 @@ software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--->
+-->
\ No newline at end of file
diff --git a/docs/getting-started/walkthroughs/table_api.md b/docs/getting-started/walkthroughs/table_api.md
new file mode 100644
index 0000000..878fb54
--- /dev/null
+++ b/docs/getting-started/walkthroughs/table_api.md
@@ -0,0 +1,494 @@
+---
+title: "Table API"
+nav-id: tableapiwalkthrough
+nav-title: 'Table API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, batch data sets and produce the same results.
+The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Will You Be Building?
+
+In this tutorial, you will learn how to build a continuous ETL pipeline for tracking financial transactions by account over time.
+You will start by building your report as a nightly batch job, and then migrate to a streaming pipeline.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses.
+
+## Help, I’m Stuck!
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
+
+## How To Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8
+* Maven
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %}
+ -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+ -DarchetypeVersion={{ site.version }} \
+ -DgroupId=spend-report \
+ -DartifactId=spend-report \
+ -Dversion=0.1 \
+ -Dpackage=spendreport \
+ -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %}
+ -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+ -DarchetypeVersion={{ site.version }} \
+ -DgroupId=spend-report \
+ -DartifactId=spend-report \
+ -Dversion=0.1 \
+ -Dpackage=spendreport \
+ -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+ <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven's official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+ .scan("transactions")
+ .insertInto("spend_report");
+
+env.execute("Spend Report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = BatchTableEnvironment.create(env)
+
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+val truncateDateToHour = new TruncateDateToHour
+
+tEnv
+ .scan("transactions")
+ .insertInto("spend_report")
+
+env.execute("Spend Report")
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down The Code
+
+#### The Execution Environment
+
+The first two lines set up your `ExecutionEnvironment`.
+The execution environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources.
+This walkthrough begins with the batch environment since you are building a periodic batch report.
+It is then wrapped in a `BatchTableEnvironment` to have full access to the Table API.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = BatchTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Registering Tables
+
+Next, tables are registered in the execution environment that you can use to connect to external systems for reading and writing both batch and streaming data.
+A table source provides access to data stored in external systems; such as a database, a key-value store, a message queue, or a file system.
+A table sink emits a table to an external storage system.
+Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+{% endhighlight %}
+</div>
+</div>
+
+Two tables are registered; a transaction input table, and a spend report output table.
+The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ amounts (`amount`).
+In this tutorial, the table is backed by data generated in memory to avoid any dependencies on external systems.
+In practice, the `BoundedTransactionTableSource` may be backed by a filesystem, a database, or any other static source.
+The spend report (`spend_report`) table logs each row with log level **INFO**, instead of writing to persistent storage, so you can easily see your results.
+
+#### Registering A UDF
+
+Along with the tables, a [user-defined function]({{ site.baseurl }}/dev/table/udfs.html) is registered for working with timestamps.
+This function takes a timestamp and rounds it down to the nearest hour.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val truncateDateToHour = new TruncateDateToHour
+{% endhighlight %}
+</div>
+</div>
+
+#### The Query
+
+With the environment configured and tables registered, you are ready to build your first application.
+From the `TableEnvironment` you can `scan` an input table to read its rows and then write those results into an output table using `insertInto`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+ .scan("transactions")
+ .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+ .scan("transactions")
+ .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+Initially, the Job reads all transactions and logs them out with log level **INFO**.
+
+#### Execute
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+You call `ExecutionEnvironment#execute` to begin the execution of your Job by giving it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Spend Report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Spend Report")
+{% endhighlight %}
+</div>
+</div>
+
+## Attempt One
+
+Now with the skeleton of a Job set-up, you are ready to add some business logic.
+The goal is to build a report that shows the total spend for each account across each hour of the day.
+Just like a SQL query, Flink can select the required fields and group by your keys.
+Because the timestamp field has millisecond granularity, you can use the UDF to round it down to the nearest hour.
+Finally, select all the fields, summing the total spend per account-hour pair with the built-in `sum` [aggregate function]({{ site.baseurl }}/dev/table/functions.html#aggregate-functions).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+ .scan("transactions")
+ .select("accountId, timestamp.truncateDateToHour as timestamp, amount")
+ .groupBy("accountId, timestamp")
+ .select("accountId, timestamp, amount.sum as total")
+ .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+ .scan("transactions")
+ .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount)
+ .groupBy('accountId, 'timestamp)
+ .select('accountId, 'timestamp, 'amount.sum as 'total)
+ .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner.
+
+{% highlight raw %}
+# Query 1 output showing account id, timestamp, and amount
+
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+> 1, 2019-01-01 02:00:00.0, $859.35
+> 2, 2019-01-01 02:00:00.0, $458.40
+> 1, 2019-01-01 03:00:00.0, $330.85
+> 2, 2019-01-01 03:00:00.0, $730.02
+> 1, 2019-01-01 04:00:00.0, $585.16
+> 2, 2019-01-01 04:00:00.0, $760.76
+{% endhighlight %}
+
+## Adding Windows
+
+Grouping data based on time is a typical operation in data processing, especially when working with infinite streams.
+A grouping based on time is called a [window]({{ site.baseurl }} /dev/stream/operators/windows.html) and Flink offers flexible windowing semantics.
+The most basic type of window is called a `Tumble` window, which has a fixed size and whose buckets do not overlap.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+ .scan("transactions")
+ .window(Tumble.over("1.hour").on("timestamp").as("w"))
+ .groupBy("accountId, w")
+ .select("accountId, w.start as timestamp, amount.sum")
+ .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+ .scan("transactions")
+ .window(Tumble over 1.hour on 'timestamp as 'w)
+ .groupBy('accountId, 'w)
+ .select('accountId, 'w.start as 'timestamp, 'amount.sum)
+ .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+This defines your application as using one hour tumbling windows based on the timestamp column.
+So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 01:00:00` window.
+
+Aggregations based on time are unique because time, as opposed to other attributes, generally moves forward in a continuous streaming application.
+In a batch context, windows offer a convenient API for grouping records by a timestamp attribute.
+
+Running the updated query will produce identical results as before.
+
+{% highlight raw %}
+# Query 2 output showing account id, timestamp, and amount
+
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+> 1, 2019-01-01 02:00:00.0, $859.35
+> 2, 2019-01-01 02:00:00.0, $458.40
+> 1, 2019-01-01 03:00:00.0, $330.85
+> 2, 2019-01-01 03:00:00.0, $730.02
+> 1, 2019-01-01 04:00:00.0, $585.16
+> 2, 2019-01-01 04:00:00.0, $760.76
+{% endhighlight %}
+
+## Once More, With Streaming!
+
+Because Flink's Table API offers consistent syntax and semantics for both batch and streaming, migrating from one to the other requires just two steps.
+
+The first step is to replace the batch `ExecutionEnvironment` with its streaming counterpart, `StreamExecutionEnvironment`, which creates a continuous streaming Job.
+It includes stream-specific configurations, such as the time characteristic, which when set to [event time]({{ site.baseurl }}/dev/event_time.html) guarantees consistent results even when faced with out-of-order events or a Job failure.
+This is what will be used by your `Tumble` window when grouping records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val tEnv = StreamTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+</div>
+
+The second step is to migrate from a bounded data source to an infinite data source.
+The project comes with an `UnboundedTransactionTableSource` that continuously creates transaction events in real-time.
+Similar to the `BoundedTransactionTableSource` this table is backed by data generated in memory to avoid any dependencies on external systems.
+In practice, this table might read from a streaming source such as Apache Kafka, AWS Kinesis, or Pravega.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+The query continuously consumes the stream of transactions, computes the hourly spendings, and emits results as soon as they are ready.
+Since the input is unbounded, the query keeps running until it is manually stopped.
+And because the Job uses time window-based aggregations, Flink can perform specific optimizations such as state clean up when the framework knows that no more records will arrive for a particular window.
+
+{% highlight raw %}
+# Query 3 output showing account id, timestamp, and amount
+
+# These rows are calculated continuously over the hour
+# and output immediately at the end of the hour
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+
+# Flink begins computing these rows as soon as
+# as the first record for the window arrives
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+
+{% endhighlight %}
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
+import org.apache.flink.walkthrough.common.table.TransactionTableSource;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+public class SpendReport {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
+ tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+
+ tEnv
+ .scan("transactions")
+ .window(Tumble.over("1.hour").on("timestamp").as("w"))
+ .groupBy("accountId, w")
+ .select("accountId, w.start as timestamp, amount.sum")
+ .insertInto("spend_report");
+
+ env.execute("Spend Report");
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.api.Tumble
+import org.apache.flink.table.api.scala._
+import org.apache.flink.walkthrough.common.table._
+
+object SpendReport {
+
+ def main(args: Array[String]): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val tEnv = StreamTableEnvironment.create(env)
+
+ tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+ tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+ tEnv
+ .scan("transactions")
+ .window(Tumble over 1.hour on 'timestamp as 'w)
+ .groupBy('accountId, 'w)
+ .select('accountId, 'w.start as 'timestamp, 'amount.sum)
+ .insertInto("spend_report")
+
+ env.execute("Spend Report")
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
diff --git a/docs/getting-started/walkthroughs/table_api.zh.md b/docs/getting-started/walkthroughs/table_api.zh.md
new file mode 100644
index 0000000..878fb54
--- /dev/null
+++ b/docs/getting-started/walkthroughs/table_api.zh.md
@@ -0,0 +1,494 @@
+---
+title: "Table API"
+nav-id: tableapiwalkthrough
+nav-title: 'Table API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, batch data sets and produce the same results.
+The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Will You Be Building?
+
+In this tutorial, you will learn how to build a continuous ETL pipeline for tracking financial transactions by account over time.
+You will start by building your report as a nightly batch job, and then migrate to a streaming pipeline.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses.
+
+## Help, I’m Stuck!
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
+
+## How To Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8
+* Maven
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %}
+ -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+ -DarchetypeVersion={{ site.version }} \
+ -DgroupId=spend-report \
+ -DartifactId=spend-report \
+ -Dversion=0.1 \
+ -Dpackage=spendreport \
+ -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %}
+ -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+ -DarchetypeVersion={{ site.version }} \
+ -DgroupId=spend-report \
+ -DartifactId=spend-report \
+ -Dversion=0.1 \
+ -Dpackage=spendreport \
+ -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+ <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven's official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+ .scan("transactions")
+ .insertInto("spend_report");
+
+env.execute("Spend Report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = BatchTableEnvironment.create(env)
+
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+val truncateDateToHour = new TruncateDateToHour
+
+tEnv
+ .scan("transactions")
+ .insertInto("spend_report")
+
+env.execute("Spend Report")
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down The Code
+
+#### The Execution Environment
+
+The first two lines set up your `ExecutionEnvironment`.
+The execution environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources.
+This walkthrough begins with the batch environment since you are building a periodic batch report.
+It is then wrapped in a `BatchTableEnvironment` to have full access to the Table API.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = BatchTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Registering Tables
+
+Next, tables are registered in the execution environment that you can use to connect to external systems for reading and writing both batch and streaming data.
+A table source provides access to data stored in external systems; such as a database, a key-value store, a message queue, or a file system.
+A table sink emits a table to an external storage system.
+Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+{% endhighlight %}
+</div>
+</div>
+
+Two tables are registered; a transaction input table, and a spend report output table.
+The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ amounts (`amount`).
+In this tutorial, the table is backed by data generated in memory to avoid any dependencies on external systems.
+In practice, the `BoundedTransactionTableSource` may be backed by a filesystem, a database, or any other static source.
+The spend report (`spend_report`) table logs each row with log level **INFO**, instead of writing to persistent storage, so you can easily see your results.
+
+#### Registering A UDF
+
+Along with the tables, a [user-defined function]({{ site.baseurl }}/dev/table/udfs.html) is registered for working with timestamps.
+This function takes a timestamp and rounds it down to the nearest hour.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val truncateDateToHour = new TruncateDateToHour
+{% endhighlight %}
+</div>
+</div>
+
+#### The Query
+
+With the environment configured and tables registered, you are ready to build your first application.
+From the `TableEnvironment` you can `scan` an input table to read its rows and then write those results into an output table using `insertInto`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+ .scan("transactions")
+ .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+ .scan("transactions")
+ .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+Initially, the Job reads all transactions and logs them out with log level **INFO**.
+
+#### Execute
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+You call `ExecutionEnvironment#execute` to begin the execution of your Job by giving it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Spend Report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Spend Report")
+{% endhighlight %}
+</div>
+</div>
+
+## Attempt One
+
+Now with the skeleton of a Job set-up, you are ready to add some business logic.
+The goal is to build a report that shows the total spend for each account across each hour of the day.
+Just like a SQL query, Flink can select the required fields and group by your keys.
+Because the timestamp field has millisecond granularity, you can use the UDF to round it down to the nearest hour.
+Finally, select all the fields, summing the total spend per account-hour pair with the built-in `sum` [aggregate function]({{ site.baseurl }}/dev/table/functions.html#aggregate-functions).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+ .scan("transactions")
+ .select("accountId, timestamp.truncateDateToHour as timestamp, amount")
+ .groupBy("accountId, timestamp")
+ .select("accountId, timestamp, amount.sum as total")
+ .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+ .scan("transactions")
+ .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount)
+ .groupBy('accountId, 'timestamp)
+ .select('accountId, 'timestamp, 'amount.sum as 'total)
+ .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner.
+
+{% highlight raw %}
+# Query 1 output showing account id, timestamp, and amount
+
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+> 1, 2019-01-01 02:00:00.0, $859.35
+> 2, 2019-01-01 02:00:00.0, $458.40
+> 1, 2019-01-01 03:00:00.0, $330.85
+> 2, 2019-01-01 03:00:00.0, $730.02
+> 1, 2019-01-01 04:00:00.0, $585.16
+> 2, 2019-01-01 04:00:00.0, $760.76
+{% endhighlight %}
+
+## Adding Windows
+
+Grouping data based on time is a typical operation in data processing, especially when working with infinite streams.
+A grouping based on time is called a [window]({{ site.baseurl }} /dev/stream/operators/windows.html) and Flink offers flexible windowing semantics.
+The most basic type of window is called a `Tumble` window, which has a fixed size and whose buckets do not overlap.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv
+ .scan("transactions")
+ .window(Tumble.over("1.hour").on("timestamp").as("w"))
+ .groupBy("accountId, w")
+ .select("accountId, w.start as timestamp, amount.sum")
+ .insertInto("spend_report");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv
+ .scan("transactions")
+ .window(Tumble over 1.hour on 'timestamp as 'w)
+ .groupBy('accountId, 'w)
+ .select('accountId, 'w.start as 'timestamp, 'amount.sum)
+ .insertInto("spend_report")
+{% endhighlight %}
+</div>
+</div>
+
+This defines your application as using one hour tumbling windows based on the timestamp column.
+So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 01:00:00` window.
+
+Aggregations based on time are unique because time, as opposed to other attributes, generally moves forward in a continuous streaming application.
+In a batch context, windows offer a convenient API for grouping records by a timestamp attribute.
+
+Running the updated query will produce identical results as before.
+
+{% highlight raw %}
+# Query 2 output showing account id, timestamp, and amount
+
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+> 1, 2019-01-01 02:00:00.0, $859.35
+> 2, 2019-01-01 02:00:00.0, $458.40
+> 1, 2019-01-01 03:00:00.0, $330.85
+> 2, 2019-01-01 03:00:00.0, $730.02
+> 1, 2019-01-01 04:00:00.0, $585.16
+> 2, 2019-01-01 04:00:00.0, $760.76
+{% endhighlight %}
+
+## Once More, With Streaming!
+
+Because Flink's Table API offers consistent syntax and semantics for both batch and streaming, migrating from one to the other requires just two steps.
+
+The first step is to replace the batch `ExecutionEnvironment` with its streaming counterpart, `StreamExecutionEnvironment`, which creates a continuous streaming Job.
+It includes stream-specific configurations, such as the time characteristic, which when set to [event time]({{ site.baseurl }}/dev/event_time.html) guarantees consistent results even when faced with out-of-order events or a Job failure.
+This is what will be used by your `Tumble` window when grouping records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val tEnv = StreamTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+</div>
+
+The second step is to migrate from a bounded data source to an infinite data source.
+The project comes with an `UnboundedTransactionTableSource` that continuously creates transaction events in real-time.
+Similar to the `BoundedTransactionTableSource` this table is backed by data generated in memory to avoid any dependencies on external systems.
+In practice, this table might read from a streaming source such as Apache Kafka, AWS Kinesis, or Pravega.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+The query continuously consumes the stream of transactions, computes the hourly spendings, and emits results as soon as they are ready.
+Since the input is unbounded, the query keeps running until it is manually stopped.
+And because the Job uses time window-based aggregations, Flink can perform specific optimizations such as state clean up when the framework knows that no more records will arrive for a particular window.
+
+{% highlight raw %}
+# Query 3 output showing account id, timestamp, and amount
+
+# These rows are calculated continuously over the hour
+# and output immediately at the end of the hour
+> 1, 2019-01-01 00:00:00.0, $567.87
+> 2, 2019-01-01 00:00:00.0, $726.23
+
+# Flink begins computing these rows as soon as
+# as the first record for the window arrives
+> 1, 2019-01-01 01:00:00.0, $686.87
+> 2, 2019-01-01 01:00:00.0, $810.06
+
+{% endhighlight %}
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
+import org.apache.flink.walkthrough.common.table.TransactionTableSource;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+public class SpendReport {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource());
+ tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+
+ tEnv
+ .scan("transactions")
+ .window(Tumble.over("1.hour").on("timestamp").as("w"))
+ .groupBy("accountId, w")
+ .select("accountId, w.start as timestamp, amount.sum")
+ .insertInto("spend_report");
+
+ env.execute("Spend Report");
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.api.Tumble
+import org.apache.flink.table.api.scala._
+import org.apache.flink.walkthrough.common.table._
+
+object SpendReport {
+
+ def main(args: Array[String]): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val tEnv = StreamTableEnvironment.create(env)
+
+ tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
+ tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+ tEnv
+ .scan("transactions")
+ .window(Tumble over 1.hour on 'timestamp as 'w)
+ .groupBy('accountId, 'w)
+ .select('accountId, 'w.start as 'timestamp, 'amount.sum)
+ .insertInto("spend_report")
+
+ env.execute("Spend Report")
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index b2578fc..653803a 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -136,6 +136,9 @@ run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scr
run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
+run_test "Walkthrough Table Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh java"
+run_test "Walkthrough Table Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh scala"
+
run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_gcp_pubsub.sh"
run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
new file mode 100755
index 0000000..77afc58
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
@@ -0,0 +1,74 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# End to end test for quick starts test.
+# Usage:
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh <Type (java or scala)>
+
+source "$(dirname "$0")"/common.sh
+
+TEST_TYPE=$1
+
+mkdir -p "${TEST_DATA_DIR}"
+cd "${TEST_DATA_DIR}"
+
+ARTIFACT_ID=flink-walkthrough-table-${TEST_TYPE}
+ARTIFACT_VERSION=0.1
+
+mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-table-${TEST_TYPE} \
+ -DarchetypeVersion=${FLINK_VERSION} \
+ -DgroupId=org.apache.flink.walkthrough \
+ -DartifactId=${ARTIFACT_ID} \
+ -Dversion=${ARTIFACT_VERSION} \
+ -Dpackage=org.apache.flink.walkthrough \
+ -DinteractiveMode=false
+
+cd "${ARTIFACT_ID}"
+
+mvn clean package -nsu > compile-output.txt
+
+if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
+ echo "Failure: The walk-through did not successfully compile"
+ exit 1
+fi
+
+cd target
+jar tvf ${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+ `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \
+ `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \
+ `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \
+ `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then
+
+ echo "Success: There are no flink core classes are contained in the jar."
+else
+ echo "Failure: There are flink core classes are contained in the jar."
+ exit 1
+fi
+
+TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar
+
+add_optional_lib "table"
+
+start_cluster
+
+${FLINK_DIR}/bin/flink run "$TEST_PROGRAM_JAR"
diff --git a/flink-walkthroughs/flink-walkthrough-common/pom.xml b/flink-walkthroughs/flink-walkthrough-common/pom.xml
new file mode 100644
index 0000000..714d344
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthroughs</artifactId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <name>flink-walkthrough-common</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java
new file mode 100644
index 0000000..606d388
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.entity;
+
+import java.util.Objects;
+
+/**
+ * A simple transaction.
+ */
+@SuppressWarnings("unused")
+public final class Transaction {
+
+ private long accountId;
+
+ private long timestamp;
+
+ private double amount;
+
+ public Transaction() { }
+
+ public Transaction(long accountId, long timestamp, double amount) {
+ this.accountId = accountId;
+ this.timestamp = timestamp;
+ this.amount = amount;
+ }
+
+ public long getAccountId() {
+ return accountId;
+ }
+
+ public void setAccountId(long accountId) {
+ this.accountId = accountId;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public double getAmount() {
+ return amount;
+ }
+
+ public void setAmount(double amount) {
+ this.amount = amount;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Transaction that = (Transaction) o;
+ return accountId == that.accountId &&
+ timestamp == that.timestamp &&
+ Double.compare(that.amount, amount) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(accountId, timestamp, amount);
+ }
+
+ @Override
+ public String toString() {
+ return "Transaction{" +
+ "accountId=" + accountId +
+ ", timestamp=" + timestamp +
+ ", amount=" + amount +
+ '}';
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java
new file mode 100644
index 0000000..fee3a82
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic output format that logs all records at level <b>INFO</b>.
+ */
+@Internal
+public class LoggerOutputFormat implements OutputFormat<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoggerOutputFormat.class);
+
+ @Override
+ public void configure(Configuration parameters) { }
+
+ @Override
+ public void open(int taskNumber, int numTasks) { }
+
+ @Override
+ public void writeRecord(String record) {
+ LOG.info(record);
+ }
+
+ @Override
+ public void close() { }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java
new file mode 100644
index 0000000..0f87f45
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.source;
+
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * An iterator of transaction events.
+ */
+final class TransactionIterator implements Iterator<Transaction>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Timestamp INITIAL_TIMESTAMP = Timestamp.valueOf("2019-01-01 00:00:00");
+
+ private static final long SIX_MINUTES = 6 * 60 * 1000;
+
+ private final boolean bounded;
+
+ private int index = 0;
+
+ private long timestamp;
+
+ static TransactionIterator bounded() {
+ return new TransactionIterator(true);
+ }
+
+ static TransactionIterator unbounded() {
+ return new TransactionIterator(false);
+ }
+
+ private TransactionIterator(boolean bounded) {
+ this.bounded = bounded;
+ this.timestamp = INITIAL_TIMESTAMP.getTime();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (index < data.size()) {
+ return true;
+ } else if (!bounded) {
+ index = 0;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Transaction next() {
+ Transaction transaction = data.get(index++);
+ transaction.setTimestamp(timestamp);
+ timestamp += SIX_MINUTES;
+ return transaction;
+ }
+
+ private static List<Transaction> data = Arrays.asList(
+ new Transaction(1, 0L, 188.23),
+ new Transaction(2, 0L, 374.79),
+ new Transaction(3, 0L, 112.15),
+ new Transaction(4, 0L, 478.75),
+ new Transaction(5, 0L, 208.85),
+ new Transaction(1, 0L, 379.64),
+ new Transaction(2, 0L, 351.44),
+ new Transaction(3, 0L, 320.75),
+ new Transaction(4, 0L, 259.42),
+ new Transaction(5, 0L, 273.44),
+ new Transaction(1, 0L, 267.25),
+ new Transaction(2, 0L, 397.15),
+ new Transaction(3, 0L, 0.219),
+ new Transaction(4, 0L, 231.94),
+ new Transaction(5, 0L, 384.73),
+ new Transaction(1, 0L, 419.62),
+ new Transaction(2, 0L, 412.91),
+ new Transaction(3, 0L, 0.77),
+ new Transaction(4, 0L, 22.10),
+ new Transaction(5, 0L, 377.54),
+ new Transaction(1, 0L, 375.44),
+ new Transaction(2, 0L, 230.18),
+ new Transaction(3, 0L, 0.80),
+ new Transaction(4, 0L, 350.89),
+ new Transaction(5, 0L, 127.55),
+ new Transaction(1, 0L, 483.91),
+ new Transaction(2, 0L, 228.22),
+ new Transaction(3, 0L, 871.15),
+ new Transaction(4, 0L, 64.19),
+ new Transaction(5, 0L, 79.43),
+ new Transaction(1, 0L, 56.12),
+ new Transaction(2, 0L, 256.48),
+ new Transaction(3, 0L, 148.16),
+ new Transaction(4, 0L, 199.95),
+ new Transaction(5, 0L, 252.37),
+ new Transaction(1, 0L, 274.73),
+ new Transaction(2, 0L, 473.54),
+ new Transaction(3, 0L, 119.92),
+ new Transaction(4, 0L, 323.59),
+ new Transaction(5, 0L, 353.16),
+ new Transaction(1, 0L, 211.90),
+ new Transaction(2, 0L, 280.93),
+ new Transaction(3, 0L, 347.89),
+ new Transaction(4, 0L, 459.86),
+ new Transaction(5, 0L, 82.31),
+ new Transaction(1, 0L, 373.26),
+ new Transaction(2, 0L, 479.83),
+ new Transaction(3, 0L, 454.25),
+ new Transaction(4, 0L, 83.64),
+ new Transaction(5, 0L, 292.44));
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java
new file mode 100644
index 0000000..5acb17e
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.types.Row;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+import java.sql.Timestamp;
+import java.util.Iterator;
+
+/**
+ * An bounded input of transactions.
+ */
+@Internal
+public class TransactionRowInputFormat extends GenericInputFormat<Row> implements NonParallelInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Iterator<Transaction> transactions;
+
+ @Override
+ public void open(GenericInputSplit split) {
+ transactions = TransactionIterator.bounded();
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return !transactions.hasNext();
+ }
+
+ @Override
+ public Row nextRecord(Row reuse) {
+ Transaction transaction = transactions.next();
+ reuse.setField(0, transaction.getAccountId());
+ reuse.setField(1, new Timestamp(transaction.getTimestamp()));
+ reuse.setField(2, transaction.getAmount());
+
+ return reuse;
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java
new file mode 100644
index 0000000..d882b1c
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.source;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * A stream of transactions.
+ */
+@Public
+public class TransactionSource extends FromIteratorFunction<Transaction> {
+
+ private static final long serialVersionUID = 1L;
+
+ public TransactionSource() {
+ super(new RateLimitedIterator<>(TransactionIterator.unbounded()));
+ }
+
+ private static class RateLimitedIterator<T> implements Iterator<T>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Iterator<T> inner;
+
+ private RateLimitedIterator(Iterator<T> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public T next() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return inner.next();
+ }
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java
new file mode 100644
index 0000000..e51a341
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.InputFormatTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.walkthrough.common.source.TransactionRowInputFormat;
+
+/**
+ * A table source for reading a bounded set of transaction events.
+ *
+ * <p>This could be backed by a table, database, or other static data set.
+ */
+@PublicEvolving
+@SuppressWarnings({"deprecation", "unused"})
+public class BoundedTransactionTableSource extends InputFormatTableSource<Row> {
+ @Override
+ public InputFormat<Row, ?> getInputFormat() {
+ return new TransactionRowInputFormat();
+ }
+
+ @Override
+ public DataType getProducedDataType() {
+ return getTableSchema().toRowDataType();
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return TableSchema.builder()
+ .field("accountId", Types.LONG)
+ .field("timestamp", Types.SQL_TIMESTAMP)
+ .field("amount", Types.DOUBLE)
+ .build();
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
new file mode 100644
index 0000000..6c85717
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.walkthrough.common.sink.LoggerOutputFormat;
+
+/**
+ * A simple table sink for writing to stdout.
+ */
+@PublicEvolving
+@SuppressWarnings("deprecation")
+public class SpendReportTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
+
+ private final TableSchema schema;
+
+ public SpendReportTableSink() {
+ this.schema = TableSchema
+ .builder()
+ .field("accountId", Types.LONG)
+ .field("timestamp", Types.SQL_TIMESTAMP)
+ .field("amount", Types.DOUBLE)
+ .build();
+ }
+
+ @Override
+ public void emitDataSet(DataSet<Row> dataSet) {
+ dataSet
+ .map(SpendReportTableSink::format)
+ .output(new LoggerOutputFormat());
+ }
+
+ @Override
+ public void emitDataStream(DataStream<Row> dataStream) {
+ dataStream
+ .map(SpendReportTableSink::format)
+ .writeUsingOutputFormat(new LoggerOutputFormat());
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return schema;
+ }
+
+ @Override
+ public DataType getConsumedDataType() {
+ return getTableSchema().toRowDataType();
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return getTableSchema().getFieldNames();
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return getTableSchema().getFieldTypes();
+ }
+
+ @Override
+ public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ return this;
+ }
+
+ private static String format(Row row) {
+ //noinspection MalformedFormatString
+ return String.format("%s, %s, $%.2f", row.getField(0), row.getField(1), row.getField(2));
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java
new file mode 100644
index 0000000..d9f35b9
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * A user defined function for rounding timestamps down to
+ * the nearest hour.
+ */
+@PublicEvolving
+@SuppressWarnings("unused")
+public class TruncateDateToHour extends ScalarFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final long ONE_HOUR = 60 * 60 * 1000;
+
+ public long eval(long timestamp) {
+ return timestamp - (timestamp % ONE_HOUR);
+ }
+
+ @Override
+ public TypeInformation<?> getResultType(Class<?>[] signature) {
+ return Types.SQL_TIMESTAMP;
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java
new file mode 100644
index 0000000..f6114da
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.DefinedRowtimeAttributes;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A table source for reading an unbounded set of transactions.
+ *
+ * <p>This table could be backed by a message queue or other streaming data source.
+ */
+@PublicEvolving
+@SuppressWarnings({"deprecation", "unused"})
+public class UnboundedTransactionTableSource
+ implements StreamTableSource<Row>,
+ DefinedRowtimeAttributes {
+
+ @Override
+ public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+ return execEnv
+ .addSource(new TransactionSource())
+ .map(transactionRowMapFunction())
+ .returns(getTableSchema().toRowType());
+ }
+
+ private MapFunction<Transaction, Row> transactionRowMapFunction() {
+ return transaction -> Row.of(
+ transaction.getAccountId(),
+ new Timestamp(transaction.getTimestamp()),
+ transaction.getAmount());
+ }
+
+ @Override
+ public DataType getProducedDataType() {
+ return getTableSchema().toRowDataType();
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return TableSchema.builder()
+ .field("accountId", Types.LONG)
+ .field("timestamp", Types.SQL_TIMESTAMP)
+ .field("amount", Types.DOUBLE)
+ .build();
+ }
+
+ @Override
+ public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
+ return Collections.singletonList(
+ new RowtimeAttributeDescriptor(
+ "timestamp",
+ new ExistingField("timestamp"),
+ new BoundedOutOfOrderTimestamps(100)));
+ }
+}
diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-java/pom.xml
similarity index 54%
copy from docs/getting-started/tutorials/index.md
copy to flink-walkthroughs/flink-walkthrough-table-java/pom.xml
index 9b0c8f4..8834701 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/flink-walkthroughs/flink-walkthrough-table-java/pom.xml
@@ -1,10 +1,3 @@
----
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
-nav-parent_id: getting-started
-nav-pos: 1
----
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -23,3 +16,22 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthroughs</artifactId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-walkthrough-table-java</artifactId>
+ <packaging>maven-archetype</packaging>
+
+</project>
diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml
similarity index 52%
copy from docs/getting-started/tutorials/index.md
copy to flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml
index 9b0c8f4..c0e8806 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -1,10 +1,3 @@
----
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
-nav-parent_id: getting-started
-nav-pos: 1
----
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -23,3 +16,21 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
+
+<archetype-descriptor
+ xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+ name="flink-walkthrough-table-java">
+ <fileSets>
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/main/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </fileSet>
+ <fileSet encoding="UTF-8">
+ <directory>src/main/resources</directory>
+ </fileSet>
+ </fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..81fdd66
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,263 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+ <packaging>jar</packaging>
+
+ <name>Flink Walkthrough Table Java</name>
+ <url>https://flink.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>@project.version@</flink.version>
+ <java.version>1.8</java.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <maven.compiler.source>${java.version}</maven.compiler.source>
+ <maven.compiler.target>${java.version}</maven.compiler.target>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Table ecosystem -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+ <!-- Example:
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ -->
+
+ <!-- Add logging framework, to produce console output when running in the IDE. -->
+ <!-- These dependencies are excluded from the application JAR by default. -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.7</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Java Compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+ <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>${package}.SpendReport</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+
+ <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <versionRange>[3.0.0,)</versionRange>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.1,)</versionRange>
+ <goals>
+ <goal>testCompile</goal>
+ <goal>compile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <!-- This profile helps to make things run out of the box in IntelliJ -->
+ <!-- Its adds Flink's core classes to the runtime class path. -->
+ <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+ <profiles>
+ <profile>
+ <id>add-dependencies-for-IDEA</id>
+
+ <activation>
+ <property>
+ <name>idea.version</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java
new file mode 100644
index 0000000..a3911ed
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
+import org.apache.flink.walkthrough.common.table.BoundedTransactionTableSource;
+import org.apache.flink.walkthrough.common.table.TruncateDateToHour;
+
+/**
+ * Skeleton code for the table walkthrough
+ */
+public class SpendReport {
+ public static void main(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+ tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
+ tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+ tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+ tEnv
+ .scan("transactions")
+ .insertInto("spend_report");
+
+ env.execute("Spend Report");
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8deec36
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.LoggerOutputFormat=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-scala/pom.xml
similarity index 54%
copy from docs/getting-started/tutorials/index.md
copy to flink-walkthroughs/flink-walkthrough-table-scala/pom.xml
index 9b0c8f4..d67a59c 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/pom.xml
@@ -1,10 +1,3 @@
----
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
-nav-parent_id: getting-started
-nav-pos: 1
----
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -23,3 +16,22 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthroughs</artifactId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-walkthrough-table-scala</artifactId>
+ <packaging>maven-archetype</packaging>
+
+</project>
diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
similarity index 52%
copy from docs/getting-started/tutorials/index.md
copy to flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
index 9b0c8f4..977e5ca 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -1,10 +1,3 @@
----
-title: "Tutorials"
-nav-id: tutorials
-nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
-nav-parent_id: getting-started
-nav-pos: 1
----
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -23,3 +16,21 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
+
+<archetype-descriptor
+ xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+ name="flink-walkthrough-table-scala">
+ <fileSets>
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/main/scala</directory>
+ <includes>
+ <include>**/*.scala</include>
+ </includes>
+ </fileSet>
+ <fileSet encoding="UTF-8">
+ <directory>src/main/resources</directory>
+ </fileSet>
+ </fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..ca9a181
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,300 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+ <packaging>jar</packaging>
+
+ <name>Flink Walkthrough Table Scala</name>
+ <url>https://flink.apache.org</url>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>@project.version@</flink.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.12</scala.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- Apache Flink dependencies -->
+ <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Table ecosystem -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+ <!-- Example:
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ -->
+
+ <!-- Add logging framework, to produce console output when running in the IDE. -->
+ <!-- These dependencies are excluded from the application JAR by default. -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.7</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+ <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>${package}.SpendReport</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Java Compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Eclipse Scala Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <!-- This profile helps to make things run out of the box in IntelliJ -->
+ <!-- Its adds Flink's core classes to the runtime class path. -->
+ <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+ <profiles>
+ <profile>
+ <id>add-dependencies-for-IDEA</id>
+
+ <activation>
+ <property>
+ <name>idea.version</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8deec36
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.LoggerOutputFormat=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala
new file mode 100644
index 0000000..47657b8
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.walkthrough.common.table._
+
+object SpendReport {
+ def main(args: Array[String]): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = BatchTableEnvironment.create(env)
+
+ tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
+ tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+
+ val truncateDateToHour = new TruncateDateToHour
+
+ tEnv
+ .scan("transactions")
+ .insertInto("spend_report")
+
+ env.execute("Spend Report")
+ }
+}
diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml
new file mode 100644
index 0000000..2733f59
--- /dev/null
+++ b/flink-walkthroughs/pom.xml
@@ -0,0 +1,95 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-walkthroughs</artifactId>
+ <packaging>pom</packaging>
+
+ <name>flink-walkthroughs</name>
+
+ <modules>
+ <module>flink-walkthrough-common</module>
+ <module>flink-walkthrough-table-java</module>
+ <module>flink-walkthrough-table-scala</module>
+ </modules>
+ <build>
+ <extensions>
+ <extension>
+ <groupId>org.apache.maven.archetype</groupId>
+ <artifactId>archetype-packaging</artifactId>
+ <version>2.2</version>
+ </extension>
+ </extensions>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-archetype-plugin</artifactId>
+ <version>2.2</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-archetype-plugin</artifactId>
+ <version>2.2</version><!--$NO-MVN-MAN-VER$-->
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <!-- deactivate the shade plugin for the walkthrough archetypes -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase/>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- use alternative delimiter for filtering resources -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <configuration>
+ <useDefaultDelimiters>false</useDefaultDelimiters>
+ <delimiters>
+ <delimiter>@</delimiter>
+ </delimiters>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </build>
+</project>
diff --git a/pom.xml b/pom.xml
index 6d77568..9300fef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@ under the License.
<module>flink-docs</module>
<module>flink-python</module>
<module>flink-ml-parent</module>
+ <module>flink-walkthroughs</module>
</modules>
<properties>