You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/09/18 08:09:37 UTC

[flink] branch master updated (e2728c0 -> ee0d6fd)

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e2728c0  [FLINK-14067] Remove unused PlanExecutor.getOptimizerPlanAsJSON()
     new df8f9a5  [FLINK-12746][docs] Add DataStream API Walkthrough
     new ee0d6fd  [FLINK-12746] Add DataStream API Walkthrough

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/projectsetup/java_api_quickstart.md       |   2 +-
 docs/dev/projectsetup/java_api_quickstart.zh.md    |   2 +-
 docs/dev/projectsetup/scala_api_quickstart.md      |   2 +-
 docs/dev/projectsetup/scala_api_quickstart.zh.md   |   2 +-
 docs/fig/fraud-transactions.svg                    |  71 ++
 docs/getting-started/docker-playgrounds/index.md   |   2 +-
 .../getting-started/docker-playgrounds/index.zh.md |   2 +-
 docs/getting-started/examples/index.md             |   2 +-
 docs/getting-started/examples/index.zh.md          |   2 +-
 docs/getting-started/index.md                      |   5 +-
 docs/getting-started/tutorials/datastream_api.md   | 430 ----------
 .../getting-started/tutorials/datastream_api.zh.md | 430 ----------
 docs/getting-started/tutorials/index.md            |   2 +-
 docs/getting-started/tutorials/index.zh.md         |   2 +-
 .../getting-started/walkthroughs/datastream_api.md | 925 +++++++++++++++++++++
 .../walkthroughs/datastream_api.zh.md              | 925 +++++++++++++++++++++
 docs/getting-started/walkthroughs/index.md         |   2 +-
 docs/getting-started/walkthroughs/index.zh.md      |   2 +-
 docs/getting-started/walkthroughs/table_api.md     |   2 +-
 docs/getting-started/walkthroughs/table_api.zh.md  |   2 +-
 docs/index.md                                      |  26 +-
 docs/redirects/example_quickstart.md               |   2 +-
 docs/redirects/tutorials_datastream_api.md         |   2 +-
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 flink-end-to-end-tests/test-scripts/common.sh      |  12 +
 flink-end-to-end-tests/test-scripts/test_cli.sh    |  11 -
 ...throughs.sh => test_datastream_walkthroughs.sh} |  35 +-
 .../test-scripts/test_table_walkthroughs.sh        |   1 +
 .../flink/walkthrough/common/entity/Alert.java     |  61 ++
 .../flink/walkthrough/common/sink/AlertSink.java   |  43 +
 .../flink-walkthrough-datastream-java/pom.xml      |  26 +-
 .../META-INF/maven/archetype-metadata.xml          |  25 +-
 .../src/main/resources/archetype-resources/pom.xml | 225 +++++
 .../src/main/java/FraudDetectionJob.java           |  50 ++
 .../src/main/java/FraudDetector.java               |  48 ++
 .../src/main/resources/log4j.properties            |  24 +
 .../flink-walkthrough-datastream-scala/pom.xml     |  26 +-
 .../META-INF/maven/archetype-metadata.xml          |  25 +-
 .../src/main/resources/archetype-resources/pom.xml | 256 ++++++
 .../src/main/resources/log4j.properties            |  24 +
 .../src/main/scala/FraudDetectionJob.scala         |  51 ++
 .../src/main/scala/FraudDetector.scala             |  49 ++
 flink-walkthroughs/pom.xml                         |   2 +
 43 files changed, 2908 insertions(+), 932 deletions(-)
 create mode 100644 docs/fig/fraud-transactions.svg
 delete mode 100644 docs/getting-started/tutorials/datastream_api.md
 delete mode 100644 docs/getting-started/tutorials/datastream_api.zh.md
 create mode 100644 docs/getting-started/walkthroughs/datastream_api.md
 create mode 100644 docs/getting-started/walkthroughs/datastream_api.zh.md
 copy flink-end-to-end-tests/test-scripts/{test_table_walkthroughs.sh => test_datastream_walkthroughs.sh} (75%)
 create mode 100644 flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
 create mode 100644 flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
 copy docs/getting-started/docker-playgrounds/index.md => flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml (54%)
 copy docs/getting-started/docker-playgrounds/index.md => flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml (52%)
 create mode 100644 flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
 create mode 100644 flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
 create mode 100644 flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
 create mode 100644 flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
 copy docs/getting-started/docker-playgrounds/index.md => flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml (54%)
 copy docs/getting-started/docker-playgrounds/index.md => flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml (52%)
 create mode 100644 flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
 create mode 100644 flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
 create mode 100644 flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
 create mode 100644 flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala


[flink] 02/02: [FLINK-12746] Add DataStream API Walkthrough

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ee0d6fdf0604d74bd1cf9a6eb9cf5338ac1aa4f9
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Tue Sep 17 17:49:51 2019 +0200

    [FLINK-12746] Add DataStream API Walkthrough
    
    * Remove old DataStream tutorial
    * Update links to new API walkthrough
    * Update order of menu entries in "Getting Started" section
    * Update index pages to reflect updated "Getting Started" section.
---
 docs/dev/projectsetup/java_api_quickstart.md       |   2 +-
 docs/dev/projectsetup/java_api_quickstart.zh.md    |   2 +-
 docs/dev/projectsetup/scala_api_quickstart.md      |   2 +-
 docs/dev/projectsetup/scala_api_quickstart.zh.md   |   2 +-
 docs/getting-started/docker-playgrounds/index.md   |   2 +-
 .../getting-started/docker-playgrounds/index.zh.md |   2 +-
 docs/getting-started/examples/index.md             |   2 +-
 docs/getting-started/examples/index.zh.md          |   2 +-
 docs/getting-started/index.md                      |   5 +-
 docs/getting-started/tutorials/datastream_api.md   | 430 ---------------------
 .../getting-started/tutorials/datastream_api.zh.md | 430 ---------------------
 docs/getting-started/tutorials/index.md            |   2 +-
 docs/getting-started/tutorials/index.zh.md         |   2 +-
 docs/getting-started/walkthroughs/index.md         |   2 +-
 docs/getting-started/walkthroughs/index.zh.md      |   2 +-
 docs/index.md                                      |  26 +-
 docs/redirects/example_quickstart.md               |   2 +-
 docs/redirects/tutorials_datastream_api.md         |   2 +-
 18 files changed, 35 insertions(+), 884 deletions(-)

diff --git a/docs/dev/projectsetup/java_api_quickstart.md b/docs/dev/projectsetup/java_api_quickstart.md
index 2b27fa0..a5b0bc4 100644
--- a/docs/dev/projectsetup/java_api_quickstart.md
+++ b/docs/dev/projectsetup/java_api_quickstart.md
@@ -336,7 +336,7 @@ can run the application from the JAR file without additionally specifying the ma
 Write your application!
 
 If you are writing a streaming application and you are looking for inspiration what to write,
-take a look at the [Stream Processing Application Tutorial]({{ site.baseurl }}/getting-started/tutorials/datastream_api.html#writing-a-flink-program).
+take a look at the [Stream Processing Application Tutorial]({{ site.baseurl }}/getting-started/walkthroughs/datastream_api.html).
 
 If you are writing a batch processing application and you are looking for inspiration what to write,
 take a look at the [Batch Application Examples]({{ site.baseurl }}/dev/batch/examples.html).
diff --git a/docs/dev/projectsetup/java_api_quickstart.zh.md b/docs/dev/projectsetup/java_api_quickstart.zh.md
index 653fab4..4a89491 100644
--- a/docs/dev/projectsetup/java_api_quickstart.zh.md
+++ b/docs/dev/projectsetup/java_api_quickstart.zh.md
@@ -323,7 +323,7 @@ __注意:__ 如果你使用其他类而不是 *StreamingJob* 作为应用程
 开始编写应用!
 
 如果你准备编写流处理应用,正在寻找灵感来写什么,
-可以看看[流处理应用程序教程]({{ site.baseurl }}/zh/getting-started/tutorials/datastream_api.html#writing-a-flink-program)
+可以看看[流处理应用程序教程]({{ site.baseurl }}/zh/getting-started/walkthroughs/datastream_api.html)
 
 如果你准备编写批处理应用,正在寻找灵感来写什么,
 可以看看[批处理应用程序示例]({{ site.baseurl }}/zh/dev/batch/examples.html)
diff --git a/docs/dev/projectsetup/scala_api_quickstart.md b/docs/dev/projectsetup/scala_api_quickstart.md
index a9de50a..b03518a 100644
--- a/docs/dev/projectsetup/scala_api_quickstart.md
+++ b/docs/dev/projectsetup/scala_api_quickstart.md
@@ -212,7 +212,7 @@ can run time application from the JAR file without additionally specifying the m
 Write your application!
 
 If you are writing a streaming application and you are looking for inspiration what to write,
-take a look at the [Stream Processing Application Tutorial]({{ site.baseurl }}/getting-started/tutorials/datastream_api.html#writing-a-flink-program)
+take a look at the [Stream Processing Application Tutorial]({{ site.baseurl }}/getting-started/walkthroughs/datastream_api.html)
 
 If you are writing a batch processing application and you are looking for inspiration what to write,
 take a look at the [Batch Application Examples]({{ site.baseurl }}/dev/batch/examples.html)
diff --git a/docs/dev/projectsetup/scala_api_quickstart.zh.md b/docs/dev/projectsetup/scala_api_quickstart.zh.md
index 187f295..888682d 100644
--- a/docs/dev/projectsetup/scala_api_quickstart.zh.md
+++ b/docs/dev/projectsetup/scala_api_quickstart.zh.md
@@ -204,7 +204,7 @@ __注意:__ 如果你使用其他类而不是 *StreamingJob* 作为应用程序
 开始编写你的应用!
 
 如果你准备编写流处理应用,正在寻找灵感来写什么,
-可以看看[流处理应用程序教程]({{ site.baseurl }}/zh/getting-started/tutorials/datastream_api.html#writing-a-flink-program)
+可以看看[流处理应用程序教程]({{ site.baseurl }}/zh/getting-started/walkthroughs/datastream_api.html)
 
 如果你准备编写批处理应用,正在寻找灵感来写什么,
 可以看看[批处理应用程序示例]({{ site.baseurl }}/zh/dev/batch/examples.html)
diff --git a/docs/getting-started/docker-playgrounds/index.md b/docs/getting-started/docker-playgrounds/index.md
index 2051e46..2f3c029 100644
--- a/docs/getting-started/docker-playgrounds/index.md
+++ b/docs/getting-started/docker-playgrounds/index.md
@@ -3,7 +3,7 @@ title: Docker Playgrounds
 nav-id: docker-playgrounds
 nav-title: '<i class="fa fa-ship title appetizer" aria-hidden="true"></i> Docker Playgrounds'
 nav-parent_id: getting-started
-nav-pos: 3
+nav-pos: 20
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/docker-playgrounds/index.zh.md b/docs/getting-started/docker-playgrounds/index.zh.md
index 2051e46..2f3c029 100644
--- a/docs/getting-started/docker-playgrounds/index.zh.md
+++ b/docs/getting-started/docker-playgrounds/index.zh.md
@@ -3,7 +3,7 @@ title: Docker Playgrounds
 nav-id: docker-playgrounds
 nav-title: '<i class="fa fa-ship title appetizer" aria-hidden="true"></i> Docker Playgrounds'
 nav-parent_id: getting-started
-nav-pos: 3
+nav-pos: 20
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/examples/index.md b/docs/getting-started/examples/index.md
index d4d315c..ad090b3 100644
--- a/docs/getting-started/examples/index.md
+++ b/docs/getting-started/examples/index.md
@@ -3,7 +3,7 @@ title: Examples
 nav-id: examples
 nav-title: '<i class="fa fa-file-code-o title appetizer" aria-hidden="true"></i> Examples'
 nav-parent_id: getting-started
-nav-pos: 3
+nav-pos: 40
 nav-show_overview: true
 ---
 <!--
diff --git a/docs/getting-started/examples/index.zh.md b/docs/getting-started/examples/index.zh.md
index e0925d8..99298a8 100644
--- a/docs/getting-started/examples/index.zh.md
+++ b/docs/getting-started/examples/index.zh.md
@@ -3,7 +3,7 @@ title: 示例
 nav-id: examples
 nav-title: '<i class="fa fa-file-code-o title appetizer" aria-hidden="true"></i> 示例'
 nav-parent_id: getting-started
-nav-pos: 3
+nav-pos: 40
 nav-show_overview: true
 ---
 <!--
diff --git a/docs/getting-started/index.md b/docs/getting-started/index.md
index 861be99..6f02e88 100644
--- a/docs/getting-started/index.md
+++ b/docs/getting-started/index.md
@@ -43,10 +43,7 @@ The **Docker Playgrounds** provide sandboxed Flink environments that are set up
 The **Code Walkthroughs** are the best way to get started and introduce you step by step to an API.
 A walkthrough provides instructions to bootstrap a small Flink project with a code skeleton and shows how to extend it to a simple application.
 
-<!-- 
-* The [**DataStream API**]() code walkthrough shows how to implement a simple DataStream application and how to extend it to be stateful and use timers.
--->
-* The [**DataStream API**](./tutorials/datastream_api.html) tutorial shows how to implement a basic DataStream application. The DataStream API is Flink's main abstraction to implement stateful streaming applications with sophisticated time semantics in Java or Scala.
+* The [**DataStream API**](./walkthroughs/datastream_api.html) code walkthrough shows how to implement a simple DataStream application and how to extend it to be stateful and use timers. The DataStream API is Flink's main abstraction to implement stateful streaming applications with sophisticated time semantics in Java or Scala.
 
 * The [**Table API**](./walkthroughs/table_api.html) code walkthrough shows how to implement a simple Table API query on a batch source and how to evolve it into a continuous query on a streaming source. The Table API Flink's language-embedded, relational API to write SQL-like queries in Java or Scala which are automatically optimized similar to SQL queries. Table API queries can be executed on batch or streaming data with identical syntax and semantics.
 
diff --git a/docs/getting-started/tutorials/datastream_api.md b/docs/getting-started/tutorials/datastream_api.md
deleted file mode 100644
index b0d242d..0000000
--- a/docs/getting-started/tutorials/datastream_api.md
+++ /dev/null
@@ -1,430 +0,0 @@
----
-title: "DataStream API Tutorial"
-nav-title: DataStream API
-nav-parent_id: apitutorials
-nav-pos: 10
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-In this guide we will start from scratch and go from setting up a Flink project to running
-a streaming analysis program on a Flink cluster.
-
-Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
-read this channel in Flink and count the number of bytes that each user edits within
-a given window of time. This is easy enough to implement in a few minutes using Flink, but it will
-give you a good foundation from which to start building more complex analysis programs on your own.
-
-## Setting up a Maven Project
-
-We are going to use a Flink Maven Archetype for creating our project structure. Please
-see [Java API Quickstart]({{ site.baseurl }}/dev/projectsetup/java_api_quickstart.html) for more details
-about this. For our purposes, the command to run is this:
-
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-quickstart-java \{% unless site.is_stable %}
-    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=wiki-edits \
-    -DartifactId=wiki-edits \
-    -Dversion=0.1 \
-    -Dpackage=wikiedits \
-    -DinteractiveMode=false
-{% endhighlight %}
-
-{% unless site.is_stable %}
-<p style="border-radius: 5px; padding: 5px" class="bg-danger">
-    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
-</p>
-{% endunless %}
-
-You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
-Maven will create a project structure that looks like this:
-
-{% highlight bash %}
-$ tree wiki-edits
-wiki-edits/
-├── pom.xml
-└── src
-    └── main
-        ├── java
-        │   └── wikiedits
-        │       ├── BatchJob.java
-        │       └── StreamingJob.java
-        └── resources
-            └── log4j.properties
-{% endhighlight %}
-
-There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
-several example Flink programs in `src/main/java`. We can delete the example programs, since
-we are going to start from scratch:
-
-{% highlight bash %}
-$ rm wiki-edits/src/main/java/wikiedits/*.java
-{% endhighlight %}
-
-As a last step we need to add the Flink Wikipedia connector as a dependency so that we can
-use it in our program. Edit the `dependencies` section of the `pom.xml` so that it looks like this:
-
-{% highlight xml %}
-<dependencies>
-    <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-java</artifactId>
-        <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-streaming-java_2.11</artifactId>
-        <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-clients_2.11</artifactId>
-        <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-connector-wikiedits_2.11</artifactId>
-        <version>${flink.version}</version>
-    </dependency>
-</dependencies>
-{% endhighlight %}
-
-Notice the `flink-connector-wikiedits_2.11` dependency that was added. (This example and
-the Wikipedia connector were inspired by the *Hello Samza* example of Apache Samza.)
-
-## Writing a Flink Program
-
-It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
-create the file `src/main/java/wikiedits/WikipediaAnalysis.java`:
-
-{% highlight java %}
-package wikiedits;
-
-public class WikipediaAnalysis {
-
-    public static void main(String[] args) throws Exception {
-
-    }
-}
-{% endhighlight %}
-
-The program is very basic now, but we will fill it in as we go. Note that I'll not give
-import statements here since IDEs can add them automatically. At the end of this section I'll show
-the complete code with import statements if you simply want to skip ahead and enter that in your
-editor.
-
-The first step in a Flink program is to create a `StreamExecutionEnvironment`
-(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
-parameters and create sources for reading from external systems. So let's go ahead and add
-this to the main method:
-
-{% highlight java %}
-StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-{% endhighlight %}
-
-Next we will create a source that reads from the Wikipedia IRC log:
-
-{% highlight java %}
-DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
-{% endhighlight %}
-
-This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
-the purposes of this example we are interested in determining the number of added or removed
-bytes that each user causes in a certain time window, let's say five seconds. For this we first
-have to specify that we want to key the stream on the user name, that is to say that operations
-on this stream should take the user name into account. In our case the summation of edited bytes in the windows
-should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
-
-{% highlight java %}
-KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
-    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
-        @Override
-        public String getKey(WikipediaEditEvent event) {
-            return event.getUser();
-        }
-    });
-{% endhighlight %}
-
-This gives us a Stream of `WikipediaEditEvent` that has a `String` key, the user name.
-We can now specify that we want to have windows imposed on this stream and compute a
-result based on elements in these windows. A window specifies a slice of a Stream
-on which to perform a computation. Windows are required when computing aggregations
-on an infinite stream of elements. In our example we will say
-that we want to aggregate the sum of edited bytes for every five seconds:
-
-{% highlight java %}
-DataStream<Tuple2<String, Long>> result = keyedEdits
-    .timeWindow(Time.seconds(5))
-    .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
-        @Override
-        public Tuple2<String, Long> createAccumulator() {
-            return new Tuple2<>("", 0L);
-        }
-
-        @Override
-        public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
-            accumulator.f0 = value.getUser();
-            accumulator.f1 += value.getByteDiff();
-            return accumulator;
-        }
-
-        @Override
-        public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
-            return accumulator;
-        }
-
-        @Override
-        public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
-            return new Tuple2<>(a.f0, a.f1 + b.f1);
-        }
-    });
-{% endhighlight %}
-
-The first call, `.timeWindow()`, specifies that we want to have tumbling (non-overlapping) windows
-of five seconds. The second call specifies a *Aggregate transformation* on each window slice for
-each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
-difference of every edit in that time window for a user. The resulting Stream now contains
-a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
-
-The only thing left to do is print the stream to the console and start execution:
-
-{% highlight java %}
-result.print();
-
-see.execute();
-{% endhighlight %}
-
-That last call is necessary to start the actual Flink job. All operations, such as creating
-sources, transformations and sinks only build up a graph of internal operations. Only when
-`execute()` is called is this graph of operations thrown on a cluster or executed on your local
-machine.
-
-The complete code so far is this:
-
-{% highlight java %}
-package wikiedits;
-
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
-import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
-
-public class WikipediaAnalysis {
-
-  public static void main(String[] args) throws Exception {
-
-    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
-
-    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
-      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
-        @Override
-        public String getKey(WikipediaEditEvent event) {
-          return event.getUser();
-        }
-      });
-
-    DataStream<Tuple2<String, Long>> result = keyedEdits
-      .timeWindow(Time.seconds(5))
-      .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
-        @Override
-      	public Tuple2<String, Long> createAccumulator() {
-      	  return new Tuple2<>("", 0L);
-      	}
-
-      	@Override
-      	public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
-      	  accumulator.f0 = value.getUser();
-      	  accumulator.f1 += value.getByteDiff();
-          return accumulator;
-      	}
-
-      	@Override
-      	public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
-      	  return accumulator;
-      	}
-
-      	@Override
-      	public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
-      	  return new Tuple2<>(a.f0, a.f1 + b.f1);
-      	}
-      });
-
-    result.print();
-
-    see.execute();
-  }
-}
-{% endhighlight %}
-
-You can run this example in your IDE or on the command line, using Maven:
-
-{% highlight bash %}
-$ mvn clean package
-$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
-{% endhighlight %}
-
-The first command builds our project and the second executes our main class. The output should be
-similar to this:
-
-{% highlight bash %}
-1> (Fenix down,114)
-6> (AnomieBOT,155)
-8> (BD2412bot,-3690)
-7> (IgnorantArmies,49)
-3> (Ckh3111,69)
-5> (Slade360,0)
-7> (Narutolovehinata5,2195)
-6> (Vuyisa2001,79)
-4> (Ms Sarah Welch,269)
-4> (KasparBot,-245)
-{% endhighlight %}
-
-The number in front of each line tells you on which parallel instance of the print sink the output
-was produced.
-
-This should get you started with writing your own Flink programs. To learn more
-you can check out our guides
-about [basic concepts]({{ site.baseurl }}/dev/api_concepts.html) and the
-[DataStream API]({{ site.baseurl }}/dev/datastream_api.html). Stick
-around for the bonus exercise if you want to learn about setting up a Flink cluster on
-your own machine and writing results to [Kafka](http://kafka.apache.org).
-
-## Bonus Exercise: Running on a Cluster and Writing to Kafka
-
-Please follow our [local setup tutorial](local_setup.html) for setting up a Flink distribution
-on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/0110/documentation.html#quickstart)
-for setting up a Kafka installation before we proceed.
-
-As a first step, we have to add the Flink Kafka connector as a dependency so that we can
-use the Kafka sink. Add this to the `pom.xml` file in the dependencies section:
-
-{% highlight xml %}
-<dependency>
-    <groupId>org.apache.flink</groupId>
-    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
-    <version>${flink.version}</version>
-</dependency>
-{% endhighlight %}
-
-Next, we need to modify our program. We'll remove the `print()` sink and instead use a
-Kafka sink. The new code looks like this:
-
-{% highlight java %}
-
-result
-    .map(new MapFunction<Tuple2<String,Long>, String>() {
-        @Override
-        public String map(Tuple2<String, Long> tuple) {
-            return tuple.toString();
-        }
-    })
-    .addSink(new FlinkKafkaProducer011<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
-{% endhighlight %}
-
-The related classes also need to be imported:
-{% highlight java %}
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.common.functions.MapFunction;
-{% endhighlight %}
-
-Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of `String` using
-a MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then,
-we create a Kafka sink. You might have to adapt the hostname and port to your setup. `"wiki-result"`
-is the name of the Kafka stream that we are going to create next, before running our program.
-Build the project using Maven because we need the jar file for running on the cluster:
-
-{% highlight bash %}
-$ mvn clean package
-{% endhighlight %}
-
-The resulting jar file will be in the `target` subfolder: `target/wiki-edits-0.1.jar`. We'll use
-this later.
-
-Now we are ready to launch a Flink cluster and run the program that writes to Kafka on it. Go
-to the location where you installed Flink and start a local cluster:
-
-{% highlight bash %}
-$ cd my/flink/directory
-$ bin/start-cluster.sh
-{% endhighlight %}
-
-We also have to create the Kafka Topic, so that our program can write to it:
-
-{% highlight bash %}
-$ cd my/kafka/directory
-$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wiki-results
-{% endhighlight %}
-
-Now we are ready to run our jar file on the local Flink cluster:
-{% highlight bash %}
-$ cd my/flink/directory
-$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
-{% endhighlight %}
-
-The output of that command should look similar to this, if everything went according to plan:
-
-{% highlight plain %}
-03/08/2016 15:09:27 Job execution switched to status RUNNING.
-03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
-03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
-03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from CREATED to SCHEDULED
-03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from SCHEDULED to DEPLOYING
-03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from DEPLOYING to RUNNING
-03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
-{% endhighlight %}
-
-You can see how the individual operators start running. There are only two, because
-the operations after the window get folded into one operation for performance reasons. In Flink
-we call this *chaining*.
-
-You can observe the output of the program by inspecting the Kafka topic using the Kafka
-console consumer:
-
-{% highlight bash %}
-bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result
-{% endhighlight %}
-
-You can also check out the Flink dashboard which should be running at [http://localhost:8081](http://localhost:8081).
-You get an overview of your cluster resources and running jobs:
-
-<a href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-overview.png" ><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-overview.png" alt="JobManager Overview"/></a>
-
-If you click on your running job you will get a view where you can inspect individual operations
-and, for example, see the number of processed elements:
-
-<a href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" ><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" alt="Example Job View"/></a>
-
-This concludes our little tour of Flink. If you have any questions, please don't hesitate to ask on our [Mailing Lists](https://flink.apache.org/community.html#mailing-lists).
-
-{% top %}
diff --git a/docs/getting-started/tutorials/datastream_api.zh.md b/docs/getting-started/tutorials/datastream_api.zh.md
deleted file mode 100644
index fd6e3bb..0000000
--- a/docs/getting-started/tutorials/datastream_api.zh.md
+++ /dev/null
@@ -1,430 +0,0 @@
----
-title: "DataStream API 教程"
-nav-title: DataStream API
-nav-parent_id: apitutorials
-nav-pos: 10
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-In this guide we will start from scratch and go from setting up a Flink project to running
-a streaming analysis program on a Flink cluster.
-
-Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
-read this channel in Flink and count the number of bytes that each user edits within
-a given window of time. This is easy enough to implement in a few minutes using Flink, but it will
-give you a good foundation from which to start building more complex analysis programs on your own.
-
-## Setting up a Maven Project
-
-We are going to use a Flink Maven Archetype for creating our project structure. Please
-see [Java API Quickstart]({{ site.baseurl }}/dev/projectsetup/java_api_quickstart.html) for more details
-about this. For our purposes, the command to run is this:
-
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-quickstart-java \{% unless site.is_stable %}
-    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=wiki-edits \
-    -DartifactId=wiki-edits \
-    -Dversion=0.1 \
-    -Dpackage=wikiedits \
-    -DinteractiveMode=false
-{% endhighlight %}
-
-{% unless site.is_stable %}
-<p style="border-radius: 5px; padding: 5px" class="bg-danger">
-    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
-</p>
-{% endunless %}
-
-You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
-Maven will create a project structure that looks like this:
-
-{% highlight bash %}
-$ tree wiki-edits
-wiki-edits/
-├── pom.xml
-└── src
-    └── main
-        ├── java
-        │   └── wikiedits
-        │       ├── BatchJob.java
-        │       └── StreamingJob.java
-        └── resources
-            └── log4j.properties
-{% endhighlight %}
-
-There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
-several example Flink programs in `src/main/java`. We can delete the example programs, since
-we are going to start from scratch:
-
-{% highlight bash %}
-$ rm wiki-edits/src/main/java/wikiedits/*.java
-{% endhighlight %}
-
-As a last step we need to add the Flink Wikipedia connector as a dependency so that we can
-use it in our program. Edit the `dependencies` section of the `pom.xml` so that it looks like this:
-
-{% highlight xml %}
-<dependencies>
-    <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-java</artifactId>
-        <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-streaming-java_2.11</artifactId>
-        <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-clients_2.11</artifactId>
-        <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-connector-wikiedits_2.11</artifactId>
-        <version>${flink.version}</version>
-    </dependency>
-</dependencies>
-{% endhighlight %}
-
-Notice the `flink-connector-wikiedits_2.11` dependency that was added. (This example and
-the Wikipedia connector were inspired by the *Hello Samza* example of Apache Samza.)
-
-## Writing a Flink Program
-
-It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
-create the file `src/main/java/wikiedits/WikipediaAnalysis.java`:
-
-{% highlight java %}
-package wikiedits;
-
-public class WikipediaAnalysis {
-
-    public static void main(String[] args) throws Exception {
-
-    }
-}
-{% endhighlight %}
-
-The program is very basic now, but we will fill it in as we go. Note that I'll not give
-import statements here since IDEs can add them automatically. At the end of this section I'll show
-the complete code with import statements if you simply want to skip ahead and enter that in your
-editor.
-
-The first step in a Flink program is to create a `StreamExecutionEnvironment`
-(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
-parameters and create sources for reading from external systems. So let's go ahead and add
-this to the main method:
-
-{% highlight java %}
-StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-{% endhighlight %}
-
-Next we will create a source that reads from the Wikipedia IRC log:
-
-{% highlight java %}
-DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
-{% endhighlight %}
-
-This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
-the purposes of this example we are interested in determining the number of added or removed
-bytes that each user causes in a certain time window, let's say five seconds. For this we first
-have to specify that we want to key the stream on the user name, that is to say that operations
-on this stream should take the user name into account. In our case the summation of edited bytes in the windows
-should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
-
-{% highlight java %}
-KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
-    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
-        @Override
-        public String getKey(WikipediaEditEvent event) {
-            return event.getUser();
-        }
-    });
-{% endhighlight %}
-
-This gives us a Stream of `WikipediaEditEvent` that has a `String` key, the user name.
-We can now specify that we want to have windows imposed on this stream and compute a
-result based on elements in these windows. A window specifies a slice of a Stream
-on which to perform a computation. Windows are required when computing aggregations
-on an infinite stream of elements. In our example we will say
-that we want to aggregate the sum of edited bytes for every five seconds:
-
-{% highlight java %}
-DataStream<Tuple2<String, Long>> result = keyedEdits
-    .timeWindow(Time.seconds(5))
-    .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
-        @Override
-        public Tuple2<String, Long> createAccumulator() {
-            return new Tuple2<>("", 0L);
-        }
-
-        @Override
-        public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
-            accumulator.f0 = value.getUser();
-            accumulator.f1 += value.getByteDiff();
-            return accumulator;
-        }
-
-        @Override
-        public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
-            return accumulator;
-        }
-
-        @Override
-        public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
-            return new Tuple2<>(a.f0, a.f1 + b.f1);
-        }
-    });
-{% endhighlight %}
-
-The first call, `.timeWindow()`, specifies that we want to have tumbling (non-overlapping) windows
-of five seconds. The second call specifies a *Aggregate transformation* on each window slice for
-each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
-difference of every edit in that time window for a user. The resulting Stream now contains
-a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
-
-The only thing left to do is print the stream to the console and start execution:
-
-{% highlight java %}
-result.print();
-
-see.execute();
-{% endhighlight %}
-
-That last call is necessary to start the actual Flink job. All operations, such as creating
-sources, transformations and sinks only build up a graph of internal operations. Only when
-`execute()` is called is this graph of operations thrown on a cluster or executed on your local
-machine.
-
-The complete code so far is this:
-
-{% highlight java %}
-package wikiedits;
-
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
-import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
-
-public class WikipediaAnalysis {
-
-  public static void main(String[] args) throws Exception {
-
-    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
-
-    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
-      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
-        @Override
-        public String getKey(WikipediaEditEvent event) {
-          return event.getUser();
-        }
-      });
-
-    DataStream<Tuple2<String, Long>> result = keyedEdits
-      .timeWindow(Time.seconds(5))
-      .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
-        @Override
-      	public Tuple2<String, Long> createAccumulator() {
-      	  return new Tuple2<>("", 0L);
-      	}
-
-      	@Override
-      	public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
-      	  accumulator.f0 = value.getUser();
-      	  accumulator.f1 += value.getByteDiff();
-          return accumulator;
-      	}
-
-      	@Override
-      	public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
-      	  return accumulator;
-      	}
-
-      	@Override
-      	public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
-      	  return new Tuple2<>(a.f0, a.f1 + b.f1);
-      	}
-      });
-
-    result.print();
-
-    see.execute();
-  }
-}
-{% endhighlight %}
-
-You can run this example in your IDE or on the command line, using Maven:
-
-{% highlight bash %}
-$ mvn clean package
-$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
-{% endhighlight %}
-
-The first command builds our project and the second executes our main class. The output should be
-similar to this:
-
-{% highlight bash %}
-1> (Fenix down,114)
-6> (AnomieBOT,155)
-8> (BD2412bot,-3690)
-7> (IgnorantArmies,49)
-3> (Ckh3111,69)
-5> (Slade360,0)
-7> (Narutolovehinata5,2195)
-6> (Vuyisa2001,79)
-4> (Ms Sarah Welch,269)
-4> (KasparBot,-245)
-{% endhighlight %}
-
-The number in front of each line tells you on which parallel instance of the print sink the output
-was produced.
-
-This should get you started with writing your own Flink programs. To learn more
-you can check out our guides
-about [basic concepts]({{ site.baseurl }}/dev/api_concepts.html) and the
-[DataStream API]({{ site.baseurl }}/dev/datastream_api.html). Stick
-around for the bonus exercise if you want to learn about setting up a Flink cluster on
-your own machine and writing results to [Kafka](http://kafka.apache.org).
-
-## Bonus Exercise: Running on a Cluster and Writing to Kafka
-
-Please follow our [local setup tutorial](local_setup.html) for setting up a Flink distribution
-on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/0110/documentation.html#quickstart)
-for setting up a Kafka installation before we proceed.
-
-As a first step, we have to add the Flink Kafka connector as a dependency so that we can
-use the Kafka sink. Add this to the `pom.xml` file in the dependencies section:
-
-{% highlight xml %}
-<dependency>
-    <groupId>org.apache.flink</groupId>
-    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
-    <version>${flink.version}</version>
-</dependency>
-{% endhighlight %}
-
-Next, we need to modify our program. We'll remove the `print()` sink and instead use a
-Kafka sink. The new code looks like this:
-
-{% highlight java %}
-
-result
-    .map(new MapFunction<Tuple2<String,Long>, String>() {
-        @Override
-        public String map(Tuple2<String, Long> tuple) {
-            return tuple.toString();
-        }
-    })
-    .addSink(new FlinkKafkaProducer011<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
-{% endhighlight %}
-
-The related classes also need to be imported:
-{% highlight java %}
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.common.functions.MapFunction;
-{% endhighlight %}
-
-Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of `String` using
-a MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then,
-we create a Kafka sink. You might have to adapt the hostname and port to your setup. `"wiki-result"`
-is the name of the Kafka stream that we are going to create next, before running our program.
-Build the project using Maven because we need the jar file for running on the cluster:
-
-{% highlight bash %}
-$ mvn clean package
-{% endhighlight %}
-
-The resulting jar file will be in the `target` subfolder: `target/wiki-edits-0.1.jar`. We'll use
-this later.
-
-Now we are ready to launch a Flink cluster and run the program that writes to Kafka on it. Go
-to the location where you installed Flink and start a local cluster:
-
-{% highlight bash %}
-$ cd my/flink/directory
-$ bin/start-cluster.sh
-{% endhighlight %}
-
-We also have to create the Kafka Topic, so that our program can write to it:
-
-{% highlight bash %}
-$ cd my/kafka/directory
-$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wiki-results
-{% endhighlight %}
-
-Now we are ready to run our jar file on the local Flink cluster:
-{% highlight bash %}
-$ cd my/flink/directory
-$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
-{% endhighlight %}
-
-The output of that command should look similar to this, if everything went according to plan:
-
-{% highlight plain %}
-03/08/2016 15:09:27 Job execution switched to status RUNNING.
-03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
-03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
-03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from CREATED to SCHEDULED
-03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from SCHEDULED to DEPLOYING
-03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from DEPLOYING to RUNNING
-03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
-{% endhighlight %}
-
-You can see how the individual operators start running. There are only two, because
-the operations after the window get folded into one operation for performance reasons. In Flink
-we call this *chaining*.
-
-You can observe the output of the program by inspecting the Kafka topic using the Kafka
-console consumer:
-
-{% highlight bash %}
-bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result
-{% endhighlight %}
-
-You can also check out the Flink dashboard which should be running at [http://localhost:8081](http://localhost:8081).
-You get an overview of your cluster resources and running jobs:
-
-<a href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-overview.png" ><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-overview.png" alt="JobManager Overview"/></a>
-
-If you click on your running job you will get a view where you can inspect individual operations
-and, for example, see the number of processed elements:
-
-<a href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" ><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" alt="Example Job View"/></a>
-
-This concludes our little tour of Flink. If you have any questions, please don't hesitate to ask on our [Mailing Lists](https://flink.apache.org/community.html#mailing-lists).
-
-{% top %}
diff --git a/docs/getting-started/tutorials/index.md b/docs/getting-started/tutorials/index.md
index f283489..55ff249 100644
--- a/docs/getting-started/tutorials/index.md
+++ b/docs/getting-started/tutorials/index.md
@@ -3,7 +3,7 @@ title: "Tutorials"
 nav-id: tutorials
 nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials'
 nav-parent_id: getting-started
-nav-pos: 2
+nav-pos: 30
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/tutorials/index.zh.md b/docs/getting-started/tutorials/index.zh.md
index ac6715b..3900614 100644
--- a/docs/getting-started/tutorials/index.zh.md
+++ b/docs/getting-started/tutorials/index.zh.md
@@ -3,7 +3,7 @@ title: "教程"
 nav-id: tutorials
 nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> 教程'
 nav-parent_id: getting-started
-nav-pos: 2
+nav-pos: 30
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/walkthroughs/index.md b/docs/getting-started/walkthroughs/index.md
index ae2f536..02b09ee 100644
--- a/docs/getting-started/walkthroughs/index.md
+++ b/docs/getting-started/walkthroughs/index.md
@@ -3,7 +3,7 @@ title: "Code Walkthroughs"
 nav-id: walkthroughs
 nav-title: '<i class="fa fa-bolt title appetizer" aria-hidden="true"></i> Code Walkthroughs'
 nav-parent_id: getting-started
-nav-pos: 1
+nav-pos: 10
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/walkthroughs/index.zh.md b/docs/getting-started/walkthroughs/index.zh.md
index ae2f536..02b09ee 100644
--- a/docs/getting-started/walkthroughs/index.zh.md
+++ b/docs/getting-started/walkthroughs/index.zh.md
@@ -3,7 +3,7 @@ title: "Code Walkthroughs"
 nav-id: walkthroughs
 nav-title: '<i class="fa fa-bolt title appetizer" aria-hidden="true"></i> Code Walkthroughs'
 nav-parent_id: getting-started
-nav-pos: 1
+nav-pos: 10
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/index.md b/docs/index.md
index 9d35c3f..9dddcf3 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -23,21 +23,35 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-
-
 This documentation is for Apache Flink version {{ site.version_title }}. These pages were built at: {% build_time %}.
 
 Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.
 
 ## First Steps
 
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommend you read these sections first.
+* **Code Walkthroughs**: Follow step-by-step guides and implement a simple application or query in one of Flink's APIs. 
+  * [Implement a DataStream application](./getting-started/walkthroughs/datastream_api.html)
+  * [Write a Table API query](./getting-started/walkthroughs/table_api.html)
+
+* **Docker Playgrounds**: Set up a sandboxed Flink environment in just a few minutes to explore and play with Flink.
+  * [Run and manage Flink streaming applications](./getting-started/docker-playgrounds/flink-operations-playground.html)
 
-- **Tutorials**: 
-  * [Implement and run a DataStream application](./getting-started/tutorials/datastream_api.html)
+* **Tutorials**: Install Flink on your local machine.
   * [Setup a local Flink cluster](./getting-started/tutorials/local_setup.html)
 
-- **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html) and the [DataStream API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs.
+* **Concepts**: Learn about Flink's basic concepts to better understand the documentation.
+  * [Dataflow Programming Model](concepts/programming-model.html)
+  * [Distributed Runtime](concepts/runtime.html)
+  * [Glossary](concepts/glossary.html)
+
+## API References
+
+The API references list and explain all features of Flink's APIs.
+
+* [Basic API Concepts](dev/api_concepts.html)
+* [DataStream API](dev/datastream_api.html)
+* [DataSet API](dev/batch/index.html)
+* [Table API &amp; SQL](dev/table/index.html)
 
 ## Deployment
 
diff --git a/docs/redirects/example_quickstart.md b/docs/redirects/example_quickstart.md
index d2736c4..85318df 100644
--- a/docs/redirects/example_quickstart.md
+++ b/docs/redirects/example_quickstart.md
@@ -1,7 +1,7 @@
 ---
 title: "DataStream API Tutorial"
 layout: redirect
-redirect: /getting-started/tutorials/datastream_api.html
+redirect: /getting-started/walkthroughs/datastream_api.html
 permalink: /quickstart/run_example_quickstart.html
 ---
 <!--
diff --git a/docs/redirects/tutorials_datastream_api.md b/docs/redirects/tutorials_datastream_api.md
index 2d2dafc..708bea5 100644
--- a/docs/redirects/tutorials_datastream_api.md
+++ b/docs/redirects/tutorials_datastream_api.md
@@ -1,7 +1,7 @@
 ---
 title: "DataStream API"
 layout: redirect
-redirect: /getting-started/tutorials/datastream_api.html
+redirect: /getting-started/walkthroughs/datastream_api.html
 permalink: /tutorials/datastream_api.html
 ---
 <!--


[flink] 01/02: [FLINK-12746][docs] Add DataStream API Walkthrough

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df8f9a586143bbd719b6e9f03592e02e45629a9a
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Jul 22 16:01:44 2019 -0500

    [FLINK-12746][docs] Add DataStream API Walkthrough
    
    This closes #9201.
---
 docs/fig/fraud-transactions.svg                    |  71 ++
 .../getting-started/walkthroughs/datastream_api.md | 925 +++++++++++++++++++++
 .../walkthroughs/datastream_api.zh.md              | 925 +++++++++++++++++++++
 docs/getting-started/walkthroughs/table_api.md     |   2 +-
 docs/getting-started/walkthroughs/table_api.zh.md  |   2 +-
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 flink-end-to-end-tests/test-scripts/common.sh      |  12 +
 flink-end-to-end-tests/test-scripts/test_cli.sh    |  11 -
 ...throughs.sh => test_datastream_walkthroughs.sh} |  35 +-
 .../test-scripts/test_table_walkthroughs.sh        |   1 +
 .../flink/walkthrough/common/entity/Alert.java     |  61 ++
 .../flink/walkthrough/common/sink/AlertSink.java   |  43 +
 .../flink-walkthrough-datastream-java/pom.xml      |  37 +
 .../META-INF/maven/archetype-metadata.xml          |  36 +
 .../src/main/resources/archetype-resources/pom.xml | 225 +++++
 .../src/main/java/FraudDetectionJob.java           |  50 ++
 .../src/main/java/FraudDetector.java               |  48 ++
 .../src/main/resources/log4j.properties            |  24 +
 .../flink-walkthrough-datastream-scala/pom.xml     |  37 +
 .../META-INF/maven/archetype-metadata.xml          |  36 +
 .../src/main/resources/archetype-resources/pom.xml | 256 ++++++
 .../src/main/resources/log4j.properties            |  24 +
 .../src/main/scala/FraudDetectionJob.scala         |  51 ++
 .../src/main/scala/FraudDetector.scala             |  49 ++
 flink-walkthroughs/pom.xml                         |   2 +
 25 files changed, 2945 insertions(+), 20 deletions(-)

diff --git a/docs/fig/fraud-transactions.svg b/docs/fig/fraud-transactions.svg
new file mode 100644
index 0000000..f8e59d9
--- /dev/null
+++ b/docs/fig/fraud-transactions.svg
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<svg version="1.1" viewBox="0 0 842.6483154296875 203.27821350097656" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg" width="100%" height="100%">
+    <rect id="svgEditorBackground" x="0" y="0" width="842.648" height="203.278" style="fill:none;stroke:none;" />
+    <clipPath id="p.0">
+        <path d="m0 0l842.6483 0l0 203.27821l-842.6483 0l0 -203.27821z" clip-rule="nonzero" />
+    </clipPath>
+    <g clip-path="url(#p.0)">
+        <path fill="#000000" fill-opacity="0.0" d="M0,0l842.6483,0l0,203.27821l-842.6483,0Z" fill-rule="evenodd" />
+        <path fill="#000000" fill-opacity="0.0" d="m203.24672 147.81758l0 -46.015747l138.67717 0l0 46.015747z" fill-rule="evenodd" />
+        <path fill="#000000" d="M265.14047,136.5832q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21876499999999,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.3593673700000011,-5.265625,-1.3124923999999965q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.93751499999999, [...]
+        <path fill="#000000" d="M608.6628,138.55434q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21875,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.359375,-5.265625,-1.3125q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.9375,0.4375q4.171875,0.703125,6.4375,2.40625q2 [...]
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.850395 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m119.850395 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m195.85039 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m271.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m347.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m423.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m499.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m575.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m651.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m727.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m803.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 19.552494l760.9974 0" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 54.749344l760.9974 0" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 94.749344l760.9974 0" fill-rule="nonzero" />
+        <path fill="#000000" d="m68.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 0 [...]
+        <path fill="#000000" d="m144.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m220.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m296.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m372.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m448.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m524.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m600.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m676.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m748.9306 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 0 [...]
+        <path fill="#000000" d="m64.78021 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m140.78021 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m220.48645 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m289.07397 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m365.07397 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m444.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m524.48645 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m596.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m669.074 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.609375 [...]
+        <path fill="#000000" d="m748.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" fill-opacity="0.0" d="m236.08136 158.07217l73.00787 0l0 68.28346l-73.00787 0z" fill-rule="evenodd" />
+        <path fill="#000000" d="m246.61261 184.99217l0 -13.359375l9.015625 0l0 1.578125l-7.25 0l0 4.140625l6.265625 0l0 1.578125l-6.265625 0l0 6.0625l-1.765625 0zm11.083496 0l0 -9.671875l1.46875 0l0 1.46875q0.5625 -1.03125 1.03125 -1.359375q0.484375 -0.328125 1.0625 -0.328125q0.828125 0 1.6875 0.53125l-0.5625 1.515625q-0.609375 -0.359375 -1.203125 -0.359375q-0.546875 0 -0.96875 0.328125q-0.421875 0.328125 -0.609375 0.890625q-0.28125 0.875 -0.28125 1.921875l0 5.0625l-1.625 0zm12.540802 -1 [...]
+        <path fill="#000000" fill-opacity="0.0" d="m563.58795 160.0433l105.03937 0l0 68.28346l-105.03937 0z" fill-rule="evenodd" />
+        <path fill="#000000" d="m574.0098 186.9633l0 -13.359375l1.8125 0l7.015625 10.484375l0 -10.484375l1.6875 0l0 13.359375l-1.8125 0l-7.015625 -10.5l0 10.5l-1.6875 0zm12.676025 -4.84375q0 -2.6875 1.484375 -3.96875q1.25 -1.078125 3.046875 -1.078125q2.0 0 3.265625 1.3125q1.265625 1.296875 1.265625 3.609375q0 1.859375 -0.5625 2.9375q-0.5625 1.0625 -1.640625 1.65625q-1.0625 0.59375 -2.328125 0.59375q-2.03125 0 -3.28125 -1.296875q-1.25 -1.3125 -1.25 -3.765625zm1.6875 0q0 1.859375 0.796875  [...]
+    </g>
+</svg>
diff --git a/docs/getting-started/walkthroughs/datastream_api.md b/docs/getting-started/walkthroughs/datastream_api.md
new file mode 100644
index 0000000..8676ae4
--- /dev/null
+++ b/docs/getting-started/walkthroughs/datastream_api.md
@@ -0,0 +1,925 @@
+---
+title: "DataStream API"
+nav-id: datastreamwalkthrough
+nav-title: 'DataStream API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a DataStream API for building robust, stateful streaming applications.
+It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems.
+In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building? 
+
+Credit card fraud is a growing concern in the digital age.
+Criminals steal credit card numbers by running scams or hacking into insecure systems.
+Stolen numbers are tested by making one or more small purchases, often for a dollar or less.
+If that works, they then make more significant purchases to get items they can sell or keep for themselves.
+
+In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions.
+Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
+
+## How to Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8 
+* Maven 
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic.
+These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough.
+
+{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE.
+Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+#### FraudDetectionJob.java
+
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+public class FraudDetectionJob {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Transaction> transactions = env
+            .addSource(new TransactionSource())
+            .name("transactions");
+        
+        DataStream<Alert> alerts = transactions
+            .keyBy(Transaction::getAccountId)
+            .process(new FraudDetector())
+            .name("fraud-detector");
+
+        alerts
+            .addSink(new AlertSink())
+            .name("send-alerts");
+
+        env.execute("Fraud Detection");
+    }
+}
+{% endhighlight %}
+
+#### FraudDetector.java
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+#### FraudDetectionJob.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
+{% endhighlight %}
+
+#### FraudDetector.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down the Code
+
+Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions.
+
+We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class.
+
+#### The Execution Environment
+
+The first line sets up your `StreamExecutionEnvironment`.
+The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+{% endhighlight %}
+</div>
+</div>
+
+#### Creating a Source
+
+Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.
+This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process.
+Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`).
+The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Transaction> transactions = env
+    .addSource(new TransactionSource())
+    .name("transactions");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val transactions: DataStream[Transaction] = env
+  .addSource(new TransactionSource)
+  .name("transactions")
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Partitioning Events & Detecting Fraud
+
+The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
+
+To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`. 
+The `process()` call adds an operator that applies a function to each partitioned element in the stream.
+It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Alert> alerts = transactions
+    .keyBy(Transaction::getAccountId)
+    .process(new FraudDetector())
+    .name("fraud-detector");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val alerts: DataStream[Alert] = transactions
+  .keyBy(transaction => transaction.getAccountId)
+  .process(new FraudDetector)
+  .name("fraud-detector")
+{% endhighlight %}
+</div>
+</div>
+
+#### Outputting Results
+ 
+A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis.
+The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+alerts.addSink(new AlertSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+alerts.addSink(new AlertSink)
+{% endhighlight %}
+</div>
+</div>
+
+#### Executing the Job
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Fraud Detection");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Fraud Detection")
+{% endhighlight %}
+</div>
+</div>
+
+#### The Fraud Detector
+
+The fraud detector is implemented as a `KeyedProcessFunction`.
+Its method `KeyedProcessFunction#processElement` is called for every transaction event.
+This first version produces an alert on every transaction, which some may say is overly conservative.
+
+The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+  
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Writing a Real Application (v1)
+
+For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
+Imagine your fraud detector processes the following stream of transactions for a particular account.
+
+<p class="text-center">
+    <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/>
+</p>
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510.
+Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html). 
+It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is set for that account.
+
+However, merely implementing the flag as a member variable in the `FraudDetector` class will not work. 
+Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert. 
+We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed.
+In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account. 
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data.
+The right hook for this is the `open()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient ValueState<Boolean> flagState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                // Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);            
+            }
+
+            // Clean up our state
+            flagState.clear();
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // Set the flag to true
+            flagState.update(true);
+        }
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      flagState.clear()
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+    }
+  }
+{% endhighlight %}
+</div>
+</div>
+
+For every transaction, the fraud detector checks the state of the flag for that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not.
+
+## Fraud Detector v2: State + Time = &#10084;&#65039;
+
+Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed. 
+For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service.
+The timer service can be used to query the current time, register timers, and delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+if (transaction.getAmount() < SMALL_AMOUNT) {
+    // set the flag to true
+    flagState.update(true);
+
+    // set the timer and timer state
+    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+    context.timerService().registerProcessingTimeTimer(timer);
+    timerState.update(timer);
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+  // set the flag to true
+  flagState.update(true)
+
+  // set the timer and timer state
+  val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+  context.timerService.registerProcessingTimeTimer(timer)
+  timerState.update(timer)
+}
+{% endhighlight %}
+</div>
+</div>
+
+Processing time is wall clock time, and is determined by the system clock of the machine running the operator. 
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`. 
+Overriding this method is how you can implement your callback to reset the flag.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+    // remove flag after 1 minute
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(
+    timestamp: Long,
+    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+    out: Collector[Alert]): Unit = {
+  // remove flag after 1 minute
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
+You can wrap this in a helper method and call this method instead of `flagState.clear()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+private void cleanUp(Context ctx) throws Exception {
+    // delete timer
+    Long timer = timerState.value();
+    ctx.timerService().deleteProcessingTimeTimer(timer);
+
+    // clean up all state
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+  // delete timer
+  val timer = timerState.value
+  ctx.timerService.deleteProcessingTimeTimer(timer)
+
+  // clean up all states
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                //Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);
+            }
+            // Clean up our state
+            cleanUp(context);
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // set the flag to true
+            flagState.update(true);
+
+            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+            context.timerService().registerProcessingTimeTimer(timer);
+
+            timerState.update(timer);
+        }
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+        // remove flag after 1 minute
+        timerState.clear();
+        flagState.clear();
+    }
+
+    private void cleanUp(Context ctx) throws Exception {
+        // delete timer
+        Long timer = timerState.value();
+        ctx.timerService().deleteProcessingTimeTimer(timer);
+
+        // clean up all state
+        timerState.clear();
+        flagState.clear();
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      cleanUp(context)
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+
+      context.timerService.registerProcessingTimeTimer(timer)
+      timerState.update(timer)
+    }
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+      out: Collector[Alert]): Unit = {
+    // remove flag after 1 minute
+    timerState.clear()
+    flagState.clear()
+  }
+
+  @throws[Exception]
+  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+    // delete timer
+    val timer = timerState.value
+    ctx.timerService.deleteProcessingTimeTimer(timer)
+
+    // clean up all states
+    timerState.clear()
+    flagState.clear()
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Expected Output
+
+Running this code with the provided `TransactionSource` will emit fraud alerts for account 3.
+You should see the following output in your task manager logs: 
+
+{% highlight bash %}
+2019-08-19 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+{% endhighlight %}
diff --git a/docs/getting-started/walkthroughs/datastream_api.zh.md b/docs/getting-started/walkthroughs/datastream_api.zh.md
new file mode 100644
index 0000000..8676ae4
--- /dev/null
+++ b/docs/getting-started/walkthroughs/datastream_api.zh.md
@@ -0,0 +1,925 @@
+---
+title: "DataStream API"
+nav-id: datastreamwalkthrough
+nav-title: 'DataStream API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a DataStream API for building robust, stateful streaming applications.
+It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems.
+In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building? 
+
+Credit card fraud is a growing concern in the digital age.
+Criminals steal credit card numbers by running scams or hacking into insecure systems.
+Stolen numbers are tested by making one or more small purchases, often for a dollar or less.
+If that works, they then make more significant purchases to get items they can sell or keep for themselves.
+
+In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions.
+Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
+
+## How to Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8 
+* Maven 
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic.
+These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough.
+
+{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE.
+Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+#### FraudDetectionJob.java
+
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+public class FraudDetectionJob {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Transaction> transactions = env
+            .addSource(new TransactionSource())
+            .name("transactions");
+        
+        DataStream<Alert> alerts = transactions
+            .keyBy(Transaction::getAccountId)
+            .process(new FraudDetector())
+            .name("fraud-detector");
+
+        alerts
+            .addSink(new AlertSink())
+            .name("send-alerts");
+
+        env.execute("Fraud Detection");
+    }
+}
+{% endhighlight %}
+
+#### FraudDetector.java
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+#### FraudDetectionJob.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
+{% endhighlight %}
+
+#### FraudDetector.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down the Code
+
+Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions.
+
+We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class.
+
+#### The Execution Environment
+
+The first line sets up your `StreamExecutionEnvironment`.
+The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+{% endhighlight %}
+</div>
+</div>
+
+#### Creating a Source
+
+Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.
+This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process.
+Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`).
+The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Transaction> transactions = env
+    .addSource(new TransactionSource())
+    .name("transactions");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val transactions: DataStream[Transaction] = env
+  .addSource(new TransactionSource)
+  .name("transactions")
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Partitioning Events & Detecting Fraud
+
+The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
+
+To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`. 
+The `process()` call adds an operator that applies a function to each partitioned element in the stream.
+It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Alert> alerts = transactions
+    .keyBy(Transaction::getAccountId)
+    .process(new FraudDetector())
+    .name("fraud-detector");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val alerts: DataStream[Alert] = transactions
+  .keyBy(transaction => transaction.getAccountId)
+  .process(new FraudDetector)
+  .name("fraud-detector")
+{% endhighlight %}
+</div>
+</div>
+
+#### Outputting Results
+ 
+A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis.
+The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+alerts.addSink(new AlertSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+alerts.addSink(new AlertSink)
+{% endhighlight %}
+</div>
+</div>
+
+#### Executing the Job
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Fraud Detection");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Fraud Detection")
+{% endhighlight %}
+</div>
+</div>
+
+#### The Fraud Detector
+
+The fraud detector is implemented as a `KeyedProcessFunction`.
+Its method `KeyedProcessFunction#processElement` is called for every transaction event.
+This first version produces an alert on every transaction, which some may say is overly conservative.
+
+The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+  
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Writing a Real Application (v1)
+
+For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
+Imagine your fraud detector processes the following stream of transactions for a particular account.
+
+<p class="text-center">
+    <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/>
+</p>
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510.
+Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html). 
+It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is set for that account.
+
+However, merely implementing the flag as a member variable in the `FraudDetector` class will not work. 
+Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert. 
+We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed.
+In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account. 
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data.
+The right hook for this is the `open()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient ValueState<Boolean> flagState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                // Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);            
+            }
+
+            // Clean up our state
+            flagState.clear();
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // Set the flag to true
+            flagState.update(true);
+        }
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      flagState.clear()
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+    }
+  }
+{% endhighlight %}
+</div>
+</div>
+
+For every transaction, the fraud detector checks the state of the flag for that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not.
+
+## Fraud Detector v2: State + Time = &#10084;&#65039;
+
+Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed. 
+For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service.
+The timer service can be used to query the current time, register timers, and delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+if (transaction.getAmount() < SMALL_AMOUNT) {
+    // set the flag to true
+    flagState.update(true);
+
+    // set the timer and timer state
+    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+    context.timerService().registerProcessingTimeTimer(timer);
+    timerState.update(timer);
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+  // set the flag to true
+  flagState.update(true)
+
+  // set the timer and timer state
+  val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+  context.timerService.registerProcessingTimeTimer(timer)
+  timerState.update(timer)
+}
+{% endhighlight %}
+</div>
+</div>
+
+Processing time is wall clock time, and is determined by the system clock of the machine running the operator. 
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`. 
+Overriding this method is how you can implement your callback to reset the flag.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+    // remove flag after 1 minute
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(
+    timestamp: Long,
+    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+    out: Collector[Alert]): Unit = {
+  // remove flag after 1 minute
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
+You can wrap this in a helper method and call this method instead of `flagState.clear()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+private void cleanUp(Context ctx) throws Exception {
+    // delete timer
+    Long timer = timerState.value();
+    ctx.timerService().deleteProcessingTimeTimer(timer);
+
+    // clean up all state
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+  // delete timer
+  val timer = timerState.value
+  ctx.timerService.deleteProcessingTimeTimer(timer)
+
+  // clean up all states
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                //Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);
+            }
+            // Clean up our state
+            cleanUp(context);
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // set the flag to true
+            flagState.update(true);
+
+            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+            context.timerService().registerProcessingTimeTimer(timer);
+
+            timerState.update(timer);
+        }
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+        // remove flag after 1 minute
+        timerState.clear();
+        flagState.clear();
+    }
+
+    private void cleanUp(Context ctx) throws Exception {
+        // delete timer
+        Long timer = timerState.value();
+        ctx.timerService().deleteProcessingTimeTimer(timer);
+
+        // clean up all state
+        timerState.clear();
+        flagState.clear();
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      cleanUp(context)
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+
+      context.timerService.registerProcessingTimeTimer(timer)
+      timerState.update(timer)
+    }
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+      out: Collector[Alert]): Unit = {
+    // remove flag after 1 minute
+    timerState.clear()
+    flagState.clear()
+  }
+
+  @throws[Exception]
+  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+    // delete timer
+    val timer = timerState.value
+    ctx.timerService.deleteProcessingTimeTimer(timer)
+
+    // clean up all states
+    timerState.clear()
+    flagState.clear()
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Expected Output
+
+Running this code with the provided `TransactionSource` will emit fraud alerts for account 3.
+You should see the following output in your task manager logs: 
+
+{% highlight bash %}
+2019-08-19 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+{% endhighlight %}
diff --git a/docs/getting-started/walkthroughs/table_api.md b/docs/getting-started/walkthroughs/table_api.md
index 878fb54..f0cbb62 100644
--- a/docs/getting-started/walkthroughs/table_api.md
+++ b/docs/getting-started/walkthroughs/table_api.md
@@ -3,7 +3,7 @@ title: "Table API"
 nav-id: tableapiwalkthrough
 nav-title: 'Table API'
 nav-parent_id: walkthroughs
-nav-pos: 1
+nav-pos: 2
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/walkthroughs/table_api.zh.md b/docs/getting-started/walkthroughs/table_api.zh.md
index 878fb54..f0cbb62 100644
--- a/docs/getting-started/walkthroughs/table_api.zh.md
+++ b/docs/getting-started/walkthroughs/table_api.zh.md
@@ -3,7 +3,7 @@ title: "Table API"
 nav-id: tableapiwalkthrough
 nav-title: 'Table API'
 nav-parent_id: walkthroughs
-nav-pos: 1
+nav-pos: 2
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 3b34b560..9307188 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -140,6 +140,8 @@ run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scrip
 
 run_test "Walkthrough Table Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh java"
 run_test "Walkthrough Table Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh scala"
+run_test "Walkthrough DataStream Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh java"
+run_test "Walkthrough DataStream Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh scala"
 
 run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_gcp_pubsub.sh"
 
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 2dc3787..bdecb32 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -745,3 +745,15 @@ function retry_times() {
     return 1
 }
 
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+
+function extract_job_id_from_job_submission_return() {
+    if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+        then
+            JOB_ID="${BASH_REMATCH[1]}";
+        else
+            JOB_ID=""
+        fi
+    echo "$JOB_ID"
+}
+
diff --git a/flink-end-to-end-tests/test-scripts/test_cli.sh b/flink-end-to-end-tests/test-scripts/test_cli.sh
index b9d285b..4b04890 100755
--- a/flink-end-to-end-tests/test-scripts/test_cli.sh
+++ b/flink-end-to-end-tests/test-scripts/test_cli.sh
@@ -29,23 +29,12 @@ $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start
 
 # CLI regular expressions
-JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
 JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\""
 JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\""
 JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :"
 
 EXIT_CODE=0
 
-function extract_job_id_from_job_submission_return() {
-    if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
-        then
-            JOB_ID="${BASH_REMATCH[1]}";
-        else
-            JOB_ID=""
-        fi
-    echo "$JOB_ID"
-}
-
 function extract_valid_pact_from_job_info_return() {
     PACT_MATCH=0
     if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]];
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
similarity index 75%
copy from flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
copy to flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
index 77afc58..e976927 100755
--- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
+++ b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
@@ -19,7 +19,7 @@
 
 # End to end test for quick starts test.
 # Usage:
-# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh <Type (java or scala)>
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh <Type (java or scala)>
 
 source "$(dirname "$0")"/common.sh
 
@@ -28,12 +28,12 @@ TEST_TYPE=$1
 mkdir -p "${TEST_DATA_DIR}"
 cd "${TEST_DATA_DIR}"
 
-ARTIFACT_ID=flink-walkthrough-table-${TEST_TYPE}
+ARTIFACT_ID=flink-walkthrough-datastream-${TEST_TYPE}
 ARTIFACT_VERSION=0.1
 
 mvn archetype:generate                                          \
     -DarchetypeGroupId=org.apache.flink                         \
-    -DarchetypeArtifactId=flink-walkthrough-table-${TEST_TYPE}  \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-${TEST_TYPE}  \
     -DarchetypeVersion=${FLINK_VERSION}                         \
     -DgroupId=org.apache.flink.walkthrough                      \
     -DartifactId=${ARTIFACT_ID}                                 \
@@ -46,7 +46,8 @@ cd "${ARTIFACT_ID}"
 mvn clean package -nsu > compile-output.txt
 
 if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
-    echo "Failure: The walk-through did not successfully compile"
+    echo "Failure: The walkthrough did not successfully compile"
+    cat compile-output.txt
     exit 1
 fi
 
@@ -67,8 +68,28 @@ fi
 
 TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar
 
-add_optional_lib "table"
-
 start_cluster
 
-${FLINK_DIR}/bin/flink run "$TEST_PROGRAM_JAR"
+JOB_ID=""
+EXIT_CODE=0
+
+RETURN=`$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR`
+echo "$RETURN"
+JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"`
+EXIT_CODE=$? # expect matching job id extraction
+
+if [ $EXIT_CODE == 0 ]; then
+    RETURN=`$FLINK_DIR/bin/flink list -r`
+    echo "$RETURN"
+    if [[ `grep -c "$JOB_ID" "$RETURN"` -eq '1'  ]]; then # expect match for running job
+        echo "[FAIL] Unable to submit walkthrough"
+        EXIT_CODE=1
+    fi
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+    eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}"
+    EXIT_CODE=$?
+fi
+
+exit $EXIT_CODE
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
index 77afc58..f53e972 100755
--- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
+++ b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
@@ -47,6 +47,7 @@ mvn clean package -nsu > compile-output.txt
 
 if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
     echo "Failure: The walk-through did not successfully compile"
+    cat compile-output.txt
     exit 1
 fi
 
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
new file mode 100644
index 0000000..9678fdb
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.entity;
+
+import java.util.Objects;
+
+/**
+ * A simple alert event.
+ */
+@SuppressWarnings("unused")
+public final class Alert {
+
+	private long id;
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		Alert alert = (Alert) o;
+		return id == alert.id;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(id);
+	}
+
+	@Override
+	public String toString() {
+		return "Alert{" +
+			"id=" + id +
+			'}';
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
new file mode 100644
index 0000000..4332a8e
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.walkthrough.common.entity.Alert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A sink for outputting alerts.
+ */
+@PublicEvolving
+@SuppressWarnings("unused")
+public class AlertSink implements SinkFunction<Alert> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
+
+	@Override
+	public void invoke(Alert value, Context context) {
+		LOG.info(value.toString());
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml
new file mode 100644
index 0000000..8788a0f
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-walkthroughs</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-walkthrough-datastream-java</artifactId>
+	<packaging>maven-archetype</packaging>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..ed235f5
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+	xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+	name="flink-walkthrough-datastream-java">
+	<fileSets>
+		<fileSet filtered="true" packaged="true" encoding="UTF-8">
+			<directory>src/main/java</directory>
+			<includes>
+				<include>**/*.java</include>
+			</includes>
+		</fileSet>
+		<fileSet encoding="UTF-8">
+			<directory>src/main/resources</directory>
+		</fileSet>
+	</fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..8229d78
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,225 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>${groupId}</groupId>
+	<artifactId>${artifactId}</artifactId>
+	<version>${version}</version>
+	<packaging>jar</packaging>
+
+	<name>Flink Walkthrough DataStream Java</name>
+	<url>https://flink.apache.org</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>@project.version@</flink.version>
+		<java.version>1.8</java.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<maven.compiler.source>${java.version}</maven.compiler.source>
+		<maven.compiler.target>${java.version}</maven.compiler.target>
+	</properties>
+
+	<repositories>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Development Snapshot Repository</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+			<releases>
+				<enabled>false</enabled>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+		<!-- Example:
+
+		<dependency>
+		    <groupId>org.apache.flink</groupId>
+		    <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+		    <version>${flink.version}</version>
+		</dependency>
+		-->
+
+		<!-- Add logging framework, to produce console output when running in the IDE. -->
+		<!-- These dependencies are excluded from the application JAR by default. -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.7.7</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.17</version>
+			<scope>runtime</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+
+			<!-- Java Compiler -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>${java.version}</source>
+					<target>${java.version}</target>
+				</configuration>
+			</plugin>
+
+			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>org.apache.flink:force-shading</exclude>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>${package}.FraudDetectionJob</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+
+				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-shade-plugin</artifactId>
+										<versionRange>[3.0.0,)</versionRange>
+										<goals>
+											<goal>shade</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-compiler-plugin</artifactId>
+										<versionRange>[3.1,)</versionRange>
+										<goals>
+											<goal>testCompile</goal>
+											<goal>compile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
+	<!-- This profile helps to make things run out of the box in IntelliJ -->
+	<!-- Its adds Flink's core classes to the runtime class path. -->
+	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+	<profiles>
+		<profile>
+			<id>add-dependencies-for-IDEA</id>
+
+			<activation>
+				<property>
+					<name>idea.version</name>
+				</property>
+			</activation>
+
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
new file mode 100644
index 0000000..46019fd
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+/**
+ * Skeleton code for the datastream walkthrough
+ */
+public class FraudDetectionJob {
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Transaction> transactions = env
+			.addSource(new TransactionSource())
+			.name("transactions");
+
+		DataStream<Alert> alerts = transactions
+			.keyBy(Transaction::getAccountId)
+			.process(new FraudDetector())
+			.name("fraud-detector");
+
+		alerts
+			.addSink(new AlertSink())
+			.name("send-alerts");
+
+		env.execute("Fraud Detection");
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
new file mode 100644
index 0000000..e5c034c
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+/**
+ * Skeleton code for implementing a fraud detector.
+ */
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final double SMALL_AMOUNT = 1.00;
+	private static final double LARGE_AMOUNT = 500.00;
+	private static final long ONE_MINUTE = 60 * 1000;
+
+	@Override
+	public void processElement(
+			Transaction transaction,
+			Context context,
+			Collector<Alert> collector) throws Exception {
+
+		Alert alert = new Alert();
+		alert.setId(transaction.getAccountId());
+
+		collector.collect(alert);
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8be9b9a
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
new file mode 100644
index 0000000..3e35e27
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-walkthroughs</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-walkthrough-datastream-scala</artifactId>
+	<packaging>maven-archetype</packaging>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..e78dc8d
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+	xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+	name="flink-walkthrough-datastream-scala">
+	<fileSets>
+		<fileSet filtered="true" packaged="true" encoding="UTF-8">
+			<directory>src/main/scala</directory>
+			<includes>
+				<include>**/*.scala</include>
+			</includes>
+		</fileSet>
+		<fileSet encoding="UTF-8">
+			<directory>src/main/resources</directory>
+		</fileSet>
+	</fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..93956fa
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,256 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>${groupId}</groupId>
+	<artifactId>${artifactId}</artifactId>
+	<version>${version}</version>
+	<packaging>jar</packaging>
+
+	<name>Flink Walkthrough DataStram Scala</name>
+	<url>https://flink.apache.org</url>
+
+	<repositories>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Development Snapshot Repository</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+			<releases>
+				<enabled>false</enabled>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>@project.version@</flink.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<scala.version>2.11.12</scala.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<!-- Apache Flink dependencies -->
+		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+		<!-- Example:
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		-->
+
+		<!-- Add logging framework, to produce console output when running in the IDE. -->
+		<!-- These dependencies are excluded from the application JAR by default. -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.7.7</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.17</version>
+			<scope>runtime</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>org.apache.flink:force-shading</exclude>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<!-- Do not copy the signatures in the META-INF folder.
+									Otherwise, this might cause SecurityExceptions when using the JAR. -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>${package}.FraudDetectionJob</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Java Compiler -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>1.8</source>
+					<target>1.8</target>
+				</configuration>
+			</plugin>
+
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.2.2</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>compile</goal>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Eclipse Scala Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+	<!-- This profile helps to make things run out of the box in IntelliJ -->
+	<!-- Its adds Flink's core classes to the runtime class path. -->
+	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+	<profiles>
+		<profile>
+			<id>add-dependencies-for-IDEA</id>
+
+			<activation>
+				<property>
+					<name>idea.version</name>
+				</property>
+			</activation>
+
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8be9b9a
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
new file mode 100644
index 0000000..58e46e2
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+/**
+  * Skeleton code for the DataStream code walkthrough
+  */
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
new file mode 100644
index 0000000..6d7d91d
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+/**
+  * Skeleton code for implementing a fraud detector.
+  */
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml
index 2733f59..ce499e7 100644
--- a/flink-walkthroughs/pom.xml
+++ b/flink-walkthroughs/pom.xml
@@ -36,6 +36,8 @@ under the License.
 		<module>flink-walkthrough-common</module>
 		<module>flink-walkthrough-table-java</module>
 		<module>flink-walkthrough-table-scala</module>
+		<module>flink-walkthrough-datastream-java</module>
+		<module>flink-walkthrough-datastream-scala</module>
 	</modules>
 	<build>
 		<extensions>