You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/09/18 08:09:38 UTC

[flink] 01/02: [FLINK-12746][docs] Add DataStream API Walkthrough

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df8f9a586143bbd719b6e9f03592e02e45629a9a
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Jul 22 16:01:44 2019 -0500

    [FLINK-12746][docs] Add DataStream API Walkthrough
    
    This closes #9201.
---
 docs/fig/fraud-transactions.svg                    |  71 ++
 .../getting-started/walkthroughs/datastream_api.md | 925 +++++++++++++++++++++
 .../walkthroughs/datastream_api.zh.md              | 925 +++++++++++++++++++++
 docs/getting-started/walkthroughs/table_api.md     |   2 +-
 docs/getting-started/walkthroughs/table_api.zh.md  |   2 +-
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 flink-end-to-end-tests/test-scripts/common.sh      |  12 +
 flink-end-to-end-tests/test-scripts/test_cli.sh    |  11 -
 ...throughs.sh => test_datastream_walkthroughs.sh} |  35 +-
 .../test-scripts/test_table_walkthroughs.sh        |   1 +
 .../flink/walkthrough/common/entity/Alert.java     |  61 ++
 .../flink/walkthrough/common/sink/AlertSink.java   |  43 +
 .../flink-walkthrough-datastream-java/pom.xml      |  37 +
 .../META-INF/maven/archetype-metadata.xml          |  36 +
 .../src/main/resources/archetype-resources/pom.xml | 225 +++++
 .../src/main/java/FraudDetectionJob.java           |  50 ++
 .../src/main/java/FraudDetector.java               |  48 ++
 .../src/main/resources/log4j.properties            |  24 +
 .../flink-walkthrough-datastream-scala/pom.xml     |  37 +
 .../META-INF/maven/archetype-metadata.xml          |  36 +
 .../src/main/resources/archetype-resources/pom.xml | 256 ++++++
 .../src/main/resources/log4j.properties            |  24 +
 .../src/main/scala/FraudDetectionJob.scala         |  51 ++
 .../src/main/scala/FraudDetector.scala             |  49 ++
 flink-walkthroughs/pom.xml                         |   2 +
 25 files changed, 2945 insertions(+), 20 deletions(-)

diff --git a/docs/fig/fraud-transactions.svg b/docs/fig/fraud-transactions.svg
new file mode 100644
index 0000000..f8e59d9
--- /dev/null
+++ b/docs/fig/fraud-transactions.svg
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<svg version="1.1" viewBox="0 0 842.6483154296875 203.27821350097656" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg" width="100%" height="100%">
+    <rect id="svgEditorBackground" x="0" y="0" width="842.648" height="203.278" style="fill:none;stroke:none;" />
+    <clipPath id="p.0">
+        <path d="m0 0l842.6483 0l0 203.27821l-842.6483 0l0 -203.27821z" clip-rule="nonzero" />
+    </clipPath>
+    <g clip-path="url(#p.0)">
+        <path fill="#000000" fill-opacity="0.0" d="M0,0l842.6483,0l0,203.27821l-842.6483,0Z" fill-rule="evenodd" />
+        <path fill="#000000" fill-opacity="0.0" d="m203.24672 147.81758l0 -46.015747l138.67717 0l0 46.015747z" fill-rule="evenodd" />
+        <path fill="#000000" d="M265.14047,136.5832q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21876499999999,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.3593673700000011,-5.265625,-1.3124923999999965q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.93751499999999, [...]
+        <path fill="#000000" d="M608.6628,138.55434q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21875,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.359375,-5.265625,-1.3125q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.9375,0.4375q4.171875,0.703125,6.4375,2.40625q2 [...]
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.850395 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m119.850395 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m195.85039 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m271.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m347.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m423.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m499.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m575.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m651.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m727.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m803.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 19.552494l760.9974 0" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 54.749344l760.9974 0" fill-rule="nonzero" />
+        <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 94.749344l760.9974 0" fill-rule="nonzero" />
+        <path fill="#000000" d="m68.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 0 [...]
+        <path fill="#000000" d="m144.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m220.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m296.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m372.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m448.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m524.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m600.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m676.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625  [...]
+        <path fill="#000000" d="m748.9306 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 0 [...]
+        <path fill="#000000" d="m64.78021 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m140.78021 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m220.48645 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m289.07397 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m365.07397 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m444.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m524.48645 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+        <path fill="#000000" d="m596.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" d="m669.074 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.609375 [...]
+        <path fill="#000000" d="m748.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+        <path fill="#000000" fill-opacity="0.0" d="m236.08136 158.07217l73.00787 0l0 68.28346l-73.00787 0z" fill-rule="evenodd" />
+        <path fill="#000000" d="m246.61261 184.99217l0 -13.359375l9.015625 0l0 1.578125l-7.25 0l0 4.140625l6.265625 0l0 1.578125l-6.265625 0l0 6.0625l-1.765625 0zm11.083496 0l0 -9.671875l1.46875 0l0 1.46875q0.5625 -1.03125 1.03125 -1.359375q0.484375 -0.328125 1.0625 -0.328125q0.828125 0 1.6875 0.53125l-0.5625 1.515625q-0.609375 -0.359375 -1.203125 -0.359375q-0.546875 0 -0.96875 0.328125q-0.421875 0.328125 -0.609375 0.890625q-0.28125 0.875 -0.28125 1.921875l0 5.0625l-1.625 0zm12.540802 -1 [...]
+        <path fill="#000000" fill-opacity="0.0" d="m563.58795 160.0433l105.03937 0l0 68.28346l-105.03937 0z" fill-rule="evenodd" />
+        <path fill="#000000" d="m574.0098 186.9633l0 -13.359375l1.8125 0l7.015625 10.484375l0 -10.484375l1.6875 0l0 13.359375l-1.8125 0l-7.015625 -10.5l0 10.5l-1.6875 0zm12.676025 -4.84375q0 -2.6875 1.484375 -3.96875q1.25 -1.078125 3.046875 -1.078125q2.0 0 3.265625 1.3125q1.265625 1.296875 1.265625 3.609375q0 1.859375 -0.5625 2.9375q-0.5625 1.0625 -1.640625 1.65625q-1.0625 0.59375 -2.328125 0.59375q-2.03125 0 -3.28125 -1.296875q-1.25 -1.3125 -1.25 -3.765625zm1.6875 0q0 1.859375 0.796875  [...]
+    </g>
+</svg>
diff --git a/docs/getting-started/walkthroughs/datastream_api.md b/docs/getting-started/walkthroughs/datastream_api.md
new file mode 100644
index 0000000..8676ae4
--- /dev/null
+++ b/docs/getting-started/walkthroughs/datastream_api.md
@@ -0,0 +1,925 @@
+---
+title: "DataStream API"
+nav-id: datastreamwalkthrough
+nav-title: 'DataStream API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a DataStream API for building robust, stateful streaming applications.
+It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems.
+In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building? 
+
+Credit card fraud is a growing concern in the digital age.
+Criminals steal credit card numbers by running scams or hacking into insecure systems.
+Stolen numbers are tested by making one or more small purchases, often for a dollar or less.
+If that works, they then make more significant purchases to get items they can sell or keep for themselves.
+
+In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions.
+Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
+
+## How to Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8 
+* Maven 
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic.
+These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough.
+
+{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE.
+Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+#### FraudDetectionJob.java
+
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+public class FraudDetectionJob {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Transaction> transactions = env
+            .addSource(new TransactionSource())
+            .name("transactions");
+        
+        DataStream<Alert> alerts = transactions
+            .keyBy(Transaction::getAccountId)
+            .process(new FraudDetector())
+            .name("fraud-detector");
+
+        alerts
+            .addSink(new AlertSink())
+            .name("send-alerts");
+
+        env.execute("Fraud Detection");
+    }
+}
+{% endhighlight %}
+
+#### FraudDetector.java
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+#### FraudDetectionJob.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
+{% endhighlight %}
+
+#### FraudDetector.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down the Code
+
+Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions.
+
+We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class.
+
+#### The Execution Environment
+
+The first line sets up your `StreamExecutionEnvironment`.
+The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+{% endhighlight %}
+</div>
+</div>
+
+#### Creating a Source
+
+Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.
+This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process.
+Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`).
+The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Transaction> transactions = env
+    .addSource(new TransactionSource())
+    .name("transactions");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val transactions: DataStream[Transaction] = env
+  .addSource(new TransactionSource)
+  .name("transactions")
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Partitioning Events & Detecting Fraud
+
+The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
+
+To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`. 
+The `process()` call adds an operator that applies a function to each partitioned element in the stream.
+It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Alert> alerts = transactions
+    .keyBy(Transaction::getAccountId)
+    .process(new FraudDetector())
+    .name("fraud-detector");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val alerts: DataStream[Alert] = transactions
+  .keyBy(transaction => transaction.getAccountId)
+  .process(new FraudDetector)
+  .name("fraud-detector")
+{% endhighlight %}
+</div>
+</div>
+
+#### Outputting Results
+ 
+A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis.
+The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+alerts.addSink(new AlertSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+alerts.addSink(new AlertSink)
+{% endhighlight %}
+</div>
+</div>
+
+#### Executing the Job
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Fraud Detection");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Fraud Detection")
+{% endhighlight %}
+</div>
+</div>
+
+#### The Fraud Detector
+
+The fraud detector is implemented as a `KeyedProcessFunction`.
+Its method `KeyedProcessFunction#processElement` is called for every transaction event.
+This first version produces an alert on every transaction, which some may say is overly conservative.
+
+The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+  
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Writing a Real Application (v1)
+
+For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
+Imagine your fraud detector processes the following stream of transactions for a particular account.
+
+<p class="text-center">
+    <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/>
+</p>
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510.
+Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html). 
+It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is set for that account.
+
+However, merely implementing the flag as a member variable in the `FraudDetector` class will not work. 
+Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert. 
+We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed.
+In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account. 
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data.
+The right hook for this is the `open()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient ValueState<Boolean> flagState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                // Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);            
+            }
+
+            // Clean up our state
+            flagState.clear();
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // Set the flag to true
+            flagState.update(true);
+        }
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      flagState.clear()
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+    }
+  }
+{% endhighlight %}
+</div>
+</div>
+
+For every transaction, the fraud detector checks the state of the flag for that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not.
+
+## Fraud Detector v2: State + Time = &#10084;&#65039;
+
+Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed. 
+For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service.
+The timer service can be used to query the current time, register timers, and delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+if (transaction.getAmount() < SMALL_AMOUNT) {
+    // set the flag to true
+    flagState.update(true);
+
+    // set the timer and timer state
+    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+    context.timerService().registerProcessingTimeTimer(timer);
+    timerState.update(timer);
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+  // set the flag to true
+  flagState.update(true)
+
+  // set the timer and timer state
+  val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+  context.timerService.registerProcessingTimeTimer(timer)
+  timerState.update(timer)
+}
+{% endhighlight %}
+</div>
+</div>
+
+Processing time is wall clock time, and is determined by the system clock of the machine running the operator. 
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`. 
+Overriding this method is how you can implement your callback to reset the flag.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+    // remove flag after 1 minute
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(
+    timestamp: Long,
+    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+    out: Collector[Alert]): Unit = {
+  // remove flag after 1 minute
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
+You can wrap this in a helper method and call this method instead of `flagState.clear()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+private void cleanUp(Context ctx) throws Exception {
+    // delete timer
+    Long timer = timerState.value();
+    ctx.timerService().deleteProcessingTimeTimer(timer);
+
+    // clean up all state
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+  // delete timer
+  val timer = timerState.value
+  ctx.timerService.deleteProcessingTimeTimer(timer)
+
+  // clean up all states
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                //Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);
+            }
+            // Clean up our state
+            cleanUp(context);
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // set the flag to true
+            flagState.update(true);
+
+            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+            context.timerService().registerProcessingTimeTimer(timer);
+
+            timerState.update(timer);
+        }
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+        // remove flag after 1 minute
+        timerState.clear();
+        flagState.clear();
+    }
+
+    private void cleanUp(Context ctx) throws Exception {
+        // delete timer
+        Long timer = timerState.value();
+        ctx.timerService().deleteProcessingTimeTimer(timer);
+
+        // clean up all state
+        timerState.clear();
+        flagState.clear();
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      cleanUp(context)
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+
+      context.timerService.registerProcessingTimeTimer(timer)
+      timerState.update(timer)
+    }
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+      out: Collector[Alert]): Unit = {
+    // remove flag after 1 minute
+    timerState.clear()
+    flagState.clear()
+  }
+
+  @throws[Exception]
+  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+    // delete timer
+    val timer = timerState.value
+    ctx.timerService.deleteProcessingTimeTimer(timer)
+
+    // clean up all states
+    timerState.clear()
+    flagState.clear()
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Expected Output
+
+Running this code with the provided `TransactionSource` will emit fraud alerts for account 3.
+You should see the following output in your task manager logs: 
+
+{% highlight bash %}
+2019-08-19 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+{% endhighlight %}
diff --git a/docs/getting-started/walkthroughs/datastream_api.zh.md b/docs/getting-started/walkthroughs/datastream_api.zh.md
new file mode 100644
index 0000000..8676ae4
--- /dev/null
+++ b/docs/getting-started/walkthroughs/datastream_api.zh.md
@@ -0,0 +1,925 @@
+---
+title: "DataStream API"
+nav-id: datastreamwalkthrough
+nav-title: 'DataStream API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a DataStream API for building robust, stateful streaming applications.
+It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems.
+In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building? 
+
+Credit card fraud is a growing concern in the digital age.
+Criminals steal credit card numbers by running scams or hacking into insecure systems.
+Stolen numbers are tested by making one or more small purchases, often for a dollar or less.
+If that works, they then make more significant purchases to get items they can sell or keep for themselves.
+
+In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions.
+Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
+
+## How to Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8 
+* Maven 
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic.
+These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough.
+
+{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %}
+    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+    -DarchetypeVersion={{ site.version }} \
+    -DgroupId=frauddetection \
+    -DartifactId=frauddetection \
+    -Dversion=0.1 \
+    -Dpackage=spendreport \
+    -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE.
+Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+#### FraudDetectionJob.java
+
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+public class FraudDetectionJob {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Transaction> transactions = env
+            .addSource(new TransactionSource())
+            .name("transactions");
+        
+        DataStream<Alert> alerts = transactions
+            .keyBy(Transaction::getAccountId)
+            .process(new FraudDetector())
+            .name("fraud-detector");
+
+        alerts
+            .addSink(new AlertSink())
+            .name("send-alerts");
+
+        env.execute("Fraud Detection");
+    }
+}
+{% endhighlight %}
+
+#### FraudDetector.java
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+#### FraudDetectionJob.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
+{% endhighlight %}
+
+#### FraudDetector.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down the Code
+
+Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions.
+
+We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class.
+
+#### The Execution Environment
+
+The first line sets up your `StreamExecutionEnvironment`.
+The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+{% endhighlight %}
+</div>
+</div>
+
+#### Creating a Source
+
+Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.
+This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process.
+Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`).
+The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Transaction> transactions = env
+    .addSource(new TransactionSource())
+    .name("transactions");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val transactions: DataStream[Transaction] = env
+  .addSource(new TransactionSource)
+  .name("transactions")
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Partitioning Events & Detecting Fraud
+
+The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
+
+To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`. 
+The `process()` call adds an operator that applies a function to each partitioned element in the stream.
+It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Alert> alerts = transactions
+    .keyBy(Transaction::getAccountId)
+    .process(new FraudDetector())
+    .name("fraud-detector");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val alerts: DataStream[Alert] = transactions
+  .keyBy(transaction => transaction.getAccountId)
+  .process(new FraudDetector)
+  .name("fraud-detector")
+{% endhighlight %}
+</div>
+</div>
+
+#### Outputting Results
+ 
+A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis.
+The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+alerts.addSink(new AlertSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+alerts.addSink(new AlertSink)
+{% endhighlight %}
+</div>
+</div>
+
+#### Executing the Job
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Fraud Detection");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Fraud Detection")
+{% endhighlight %}
+</div>
+</div>
+
+#### The Fraud Detector
+
+The fraud detector is implemented as a `KeyedProcessFunction`.
+Its method `KeyedProcessFunction#processElement` is called for every transaction event.
+This first version produces an alert on every transaction, which some may say is overly conservative.
+
+The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+  
+        Alert alert = new Alert();
+        alert.setId(transaction.getAccountId());
+
+        collector.collect(alert);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Writing a Real Application (v1)
+
+For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
+Imagine your fraud detector processes the following stream of transactions for a particular account.
+
+<p class="text-center">
+    <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/>
+</p>
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510.
+Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html). 
+It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is set for that account.
+
+However, merely implementing the flag as a member variable in the `FraudDetector` class will not work. 
+Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert. 
+We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed.
+In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account. 
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data.
+The right hook for this is the `open()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient ValueState<Boolean> flagState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                // Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);            
+            }
+
+            // Clean up our state
+            flagState.clear();
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // Set the flag to true
+            flagState.update(true);
+        }
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      flagState.clear()
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+    }
+  }
+{% endhighlight %}
+</div>
+</div>
+
+For every transaction, the fraud detector checks the state of the flag for that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not.
+
+## Fraud Detector v2: State + Time = &#10084;&#65039;
+
+Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed. 
+For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service.
+The timer service can be used to query the current time, register timers, and delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+if (transaction.getAmount() < SMALL_AMOUNT) {
+    // set the flag to true
+    flagState.update(true);
+
+    // set the timer and timer state
+    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+    context.timerService().registerProcessingTimeTimer(timer);
+    timerState.update(timer);
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+  // set the flag to true
+  flagState.update(true)
+
+  // set the timer and timer state
+  val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+  context.timerService.registerProcessingTimeTimer(timer)
+  timerState.update(timer)
+}
+{% endhighlight %}
+</div>
+</div>
+
+Processing time is wall clock time, and is determined by the system clock of the machine running the operator. 
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`. 
+Overriding this method is how you can implement your callback to reset the flag.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+    // remove flag after 1 minute
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(
+    timestamp: Long,
+    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+    out: Collector[Alert]): Unit = {
+  // remove flag after 1 minute
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
+You can wrap this in a helper method and call this method instead of `flagState.clear()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+private void cleanUp(Context ctx) throws Exception {
+    // delete timer
+    Long timer = timerState.value();
+    ctx.timerService().deleteProcessingTimeTimer(timer);
+
+    // clean up all state
+    timerState.clear();
+    flagState.clear();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+  // delete timer
+  val timer = timerState.value
+  ctx.timerService.deleteProcessingTimeTimer(timer)
+
+  // clean up all states
+  timerState.clear()
+  flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                //Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);
+            }
+            // Clean up our state
+            cleanUp(context);
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // set the flag to true
+            flagState.update(true);
+
+            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+            context.timerService().registerProcessingTimeTimer(timer);
+
+            timerState.update(timer);
+        }
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+        // remove flag after 1 minute
+        timerState.clear();
+        flagState.clear();
+    }
+
+    private void cleanUp(Context ctx) throws Exception {
+        // delete timer
+        Long timer = timerState.value();
+        ctx.timerService().deleteProcessingTimeTimer(timer);
+
+        // clean up all state
+        timerState.clear();
+        flagState.clear();
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      cleanUp(context)
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+
+      context.timerService.registerProcessingTimeTimer(timer)
+      timerState.update(timer)
+    }
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+      out: Collector[Alert]): Unit = {
+    // remove flag after 1 minute
+    timerState.clear()
+    flagState.clear()
+  }
+
+  @throws[Exception]
+  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+    // delete timer
+    val timer = timerState.value
+    ctx.timerService.deleteProcessingTimeTimer(timer)
+
+    // clean up all states
+    timerState.clear()
+    flagState.clear()
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Expected Output
+
+Running this code with the provided `TransactionSource` will emit fraud alerts for account 3.
+You should see the following output in your task manager logs: 
+
+{% highlight bash %}
+2019-08-19 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
+{% endhighlight %}
diff --git a/docs/getting-started/walkthroughs/table_api.md b/docs/getting-started/walkthroughs/table_api.md
index 878fb54..f0cbb62 100644
--- a/docs/getting-started/walkthroughs/table_api.md
+++ b/docs/getting-started/walkthroughs/table_api.md
@@ -3,7 +3,7 @@ title: "Table API"
 nav-id: tableapiwalkthrough
 nav-title: 'Table API'
 nav-parent_id: walkthroughs
-nav-pos: 1
+nav-pos: 2
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/walkthroughs/table_api.zh.md b/docs/getting-started/walkthroughs/table_api.zh.md
index 878fb54..f0cbb62 100644
--- a/docs/getting-started/walkthroughs/table_api.zh.md
+++ b/docs/getting-started/walkthroughs/table_api.zh.md
@@ -3,7 +3,7 @@ title: "Table API"
 nav-id: tableapiwalkthrough
 nav-title: 'Table API'
 nav-parent_id: walkthroughs
-nav-pos: 1
+nav-pos: 2
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 3b34b560..9307188 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -140,6 +140,8 @@ run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scrip
 
 run_test "Walkthrough Table Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh java"
 run_test "Walkthrough Table Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh scala"
+run_test "Walkthrough DataStream Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh java"
+run_test "Walkthrough DataStream Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh scala"
 
 run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_gcp_pubsub.sh"
 
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 2dc3787..bdecb32 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -745,3 +745,15 @@ function retry_times() {
     return 1
 }
 
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+
+function extract_job_id_from_job_submission_return() {
+    if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+        then
+            JOB_ID="${BASH_REMATCH[1]}";
+        else
+            JOB_ID=""
+        fi
+    echo "$JOB_ID"
+}
+
diff --git a/flink-end-to-end-tests/test-scripts/test_cli.sh b/flink-end-to-end-tests/test-scripts/test_cli.sh
index b9d285b..4b04890 100755
--- a/flink-end-to-end-tests/test-scripts/test_cli.sh
+++ b/flink-end-to-end-tests/test-scripts/test_cli.sh
@@ -29,23 +29,12 @@ $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start
 
 # CLI regular expressions
-JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
 JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\""
 JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\""
 JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :"
 
 EXIT_CODE=0
 
-function extract_job_id_from_job_submission_return() {
-    if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
-        then
-            JOB_ID="${BASH_REMATCH[1]}";
-        else
-            JOB_ID=""
-        fi
-    echo "$JOB_ID"
-}
-
 function extract_valid_pact_from_job_info_return() {
     PACT_MATCH=0
     if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]];
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
similarity index 75%
copy from flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
copy to flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
index 77afc58..e976927 100755
--- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
+++ b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
@@ -19,7 +19,7 @@
 
 # End to end test for quick starts test.
 # Usage:
-# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh <Type (java or scala)>
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh <Type (java or scala)>
 
 source "$(dirname "$0")"/common.sh
 
@@ -28,12 +28,12 @@ TEST_TYPE=$1
 mkdir -p "${TEST_DATA_DIR}"
 cd "${TEST_DATA_DIR}"
 
-ARTIFACT_ID=flink-walkthrough-table-${TEST_TYPE}
+ARTIFACT_ID=flink-walkthrough-datastream-${TEST_TYPE}
 ARTIFACT_VERSION=0.1
 
 mvn archetype:generate                                          \
     -DarchetypeGroupId=org.apache.flink                         \
-    -DarchetypeArtifactId=flink-walkthrough-table-${TEST_TYPE}  \
+    -DarchetypeArtifactId=flink-walkthrough-datastream-${TEST_TYPE}  \
     -DarchetypeVersion=${FLINK_VERSION}                         \
     -DgroupId=org.apache.flink.walkthrough                      \
     -DartifactId=${ARTIFACT_ID}                                 \
@@ -46,7 +46,8 @@ cd "${ARTIFACT_ID}"
 mvn clean package -nsu > compile-output.txt
 
 if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
-    echo "Failure: The walk-through did not successfully compile"
+    echo "Failure: The walkthrough did not successfully compile"
+    cat compile-output.txt
     exit 1
 fi
 
@@ -67,8 +68,28 @@ fi
 
 TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar
 
-add_optional_lib "table"
-
 start_cluster
 
-${FLINK_DIR}/bin/flink run "$TEST_PROGRAM_JAR"
+JOB_ID=""
+EXIT_CODE=0
+
+RETURN=`$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR`
+echo "$RETURN"
+JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"`
+EXIT_CODE=$? # expect matching job id extraction
+
+if [ $EXIT_CODE == 0 ]; then
+    RETURN=`$FLINK_DIR/bin/flink list -r`
+    echo "$RETURN"
+    if [[ `grep -c "$JOB_ID" "$RETURN"` -eq '1'  ]]; then # expect match for running job
+        echo "[FAIL] Unable to submit walkthrough"
+        EXIT_CODE=1
+    fi
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+    eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}"
+    EXIT_CODE=$?
+fi
+
+exit $EXIT_CODE
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
index 77afc58..f53e972 100755
--- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
+++ b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
@@ -47,6 +47,7 @@ mvn clean package -nsu > compile-output.txt
 
 if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
     echo "Failure: The walk-through did not successfully compile"
+    cat compile-output.txt
     exit 1
 fi
 
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
new file mode 100644
index 0000000..9678fdb
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.entity;
+
+import java.util.Objects;
+
+/**
+ * A simple alert event.
+ */
+@SuppressWarnings("unused")
+public final class Alert {
+
+	private long id;
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		Alert alert = (Alert) o;
+		return id == alert.id;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(id);
+	}
+
+	@Override
+	public String toString() {
+		return "Alert{" +
+			"id=" + id +
+			'}';
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
new file mode 100644
index 0000000..4332a8e
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.walkthrough.common.entity.Alert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A sink for outputting alerts.
+ */
+@PublicEvolving
+@SuppressWarnings("unused")
+public class AlertSink implements SinkFunction<Alert> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
+
+	@Override
+	public void invoke(Alert value, Context context) {
+		LOG.info(value.toString());
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml
new file mode 100644
index 0000000..8788a0f
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-walkthroughs</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-walkthrough-datastream-java</artifactId>
+	<packaging>maven-archetype</packaging>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..ed235f5
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+	xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+	name="flink-walkthrough-datastream-java">
+	<fileSets>
+		<fileSet filtered="true" packaged="true" encoding="UTF-8">
+			<directory>src/main/java</directory>
+			<includes>
+				<include>**/*.java</include>
+			</includes>
+		</fileSet>
+		<fileSet encoding="UTF-8">
+			<directory>src/main/resources</directory>
+		</fileSet>
+	</fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..8229d78
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,225 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>${groupId}</groupId>
+	<artifactId>${artifactId}</artifactId>
+	<version>${version}</version>
+	<packaging>jar</packaging>
+
+	<name>Flink Walkthrough DataStream Java</name>
+	<url>https://flink.apache.org</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>@project.version@</flink.version>
+		<java.version>1.8</java.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<maven.compiler.source>${java.version}</maven.compiler.source>
+		<maven.compiler.target>${java.version}</maven.compiler.target>
+	</properties>
+
+	<repositories>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Development Snapshot Repository</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+			<releases>
+				<enabled>false</enabled>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+		<!-- Example:
+
+		<dependency>
+		    <groupId>org.apache.flink</groupId>
+		    <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+		    <version>${flink.version}</version>
+		</dependency>
+		-->
+
+		<!-- Add logging framework, to produce console output when running in the IDE. -->
+		<!-- These dependencies are excluded from the application JAR by default. -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.7.7</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.17</version>
+			<scope>runtime</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+
+			<!-- Java Compiler -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>${java.version}</source>
+					<target>${java.version}</target>
+				</configuration>
+			</plugin>
+
+			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>org.apache.flink:force-shading</exclude>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>${package}.FraudDetectionJob</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+
+				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-shade-plugin</artifactId>
+										<versionRange>[3.0.0,)</versionRange>
+										<goals>
+											<goal>shade</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-compiler-plugin</artifactId>
+										<versionRange>[3.1,)</versionRange>
+										<goals>
+											<goal>testCompile</goal>
+											<goal>compile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
+	<!-- This profile helps to make things run out of the box in IntelliJ -->
+	<!-- Its adds Flink's core classes to the runtime class path. -->
+	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+	<profiles>
+		<profile>
+			<id>add-dependencies-for-IDEA</id>
+
+			<activation>
+				<property>
+					<name>idea.version</name>
+				</property>
+			</activation>
+
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
new file mode 100644
index 0000000..46019fd
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+/**
+ * Skeleton code for the datastream walkthrough
+ */
+public class FraudDetectionJob {
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Transaction> transactions = env
+			.addSource(new TransactionSource())
+			.name("transactions");
+
+		DataStream<Alert> alerts = transactions
+			.keyBy(Transaction::getAccountId)
+			.process(new FraudDetector())
+			.name("fraud-detector");
+
+		alerts
+			.addSink(new AlertSink())
+			.name("send-alerts");
+
+		env.execute("Fraud Detection");
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
new file mode 100644
index 0000000..e5c034c
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+/**
+ * Skeleton code for implementing a fraud detector.
+ */
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final double SMALL_AMOUNT = 1.00;
+	private static final double LARGE_AMOUNT = 500.00;
+	private static final long ONE_MINUTE = 60 * 1000;
+
+	@Override
+	public void processElement(
+			Transaction transaction,
+			Context context,
+			Collector<Alert> collector) throws Exception {
+
+		Alert alert = new Alert();
+		alert.setId(transaction.getAccountId());
+
+		collector.collect(alert);
+	}
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8be9b9a
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
new file mode 100644
index 0000000..3e35e27
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-walkthroughs</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-walkthrough-datastream-scala</artifactId>
+	<packaging>maven-archetype</packaging>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..e78dc8d
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+	xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+	name="flink-walkthrough-datastream-scala">
+	<fileSets>
+		<fileSet filtered="true" packaged="true" encoding="UTF-8">
+			<directory>src/main/scala</directory>
+			<includes>
+				<include>**/*.scala</include>
+			</includes>
+		</fileSet>
+		<fileSet encoding="UTF-8">
+			<directory>src/main/resources</directory>
+		</fileSet>
+	</fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..93956fa
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,256 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>${groupId}</groupId>
+	<artifactId>${artifactId}</artifactId>
+	<version>${version}</version>
+	<packaging>jar</packaging>
+
+	<name>Flink Walkthrough DataStram Scala</name>
+	<url>https://flink.apache.org</url>
+
+	<repositories>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Development Snapshot Repository</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+			<releases>
+				<enabled>false</enabled>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flink.version>@project.version@</flink.version>
+		<scala.binary.version>2.11</scala.binary.version>
+		<scala.version>2.11.12</scala.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<!-- Apache Flink dependencies -->
+		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+		<!-- Example:
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		-->
+
+		<!-- Add logging framework, to produce console output when running in the IDE. -->
+		<!-- These dependencies are excluded from the application JAR by default. -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.7.7</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.17</version>
+			<scope>runtime</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>org.apache.flink:force-shading</exclude>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<!-- Do not copy the signatures in the META-INF folder.
+									Otherwise, this might cause SecurityExceptions when using the JAR. -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>${package}.FraudDetectionJob</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Java Compiler -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>1.8</source>
+					<target>1.8</target>
+				</configuration>
+			</plugin>
+
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.2.2</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>compile</goal>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Eclipse Scala Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+	<!-- This profile helps to make things run out of the box in IntelliJ -->
+	<!-- Its adds Flink's core classes to the runtime class path. -->
+	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+	<profiles>
+		<profile>
+			<id>add-dependencies-for-IDEA</id>
+
+			<activation>
+				<property>
+					<name>idea.version</name>
+				</property>
+			</activation>
+
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+					<version>${flink.version}</version>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8be9b9a
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
new file mode 100644
index 0000000..58e46e2
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+/**
+  * Skeleton code for the DataStream code walkthrough
+  */
+object FraudDetectionJob {
+
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val transactions: DataStream[Transaction] = env
+      .addSource(new TransactionSource)
+      .name("transactions")
+
+    val alerts: DataStream[Alert] = transactions
+      .keyBy(transaction => transaction.getAccountId)
+      .process(new FraudDetector)
+      .name("fraud-detector")
+
+    alerts
+      .addSink(new AlertSink)
+      .name("send-alerts")
+
+    env.execute("Fraud Detection")
+  }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
new file mode 100644
index 0000000..6d7d91d
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+/**
+  * Skeleton code for implementing a fraud detector.
+  */
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @throws[Exception]
+  def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    val alert = new Alert
+    alert.setId(transaction.getAccountId)
+
+    collector.collect(alert)
+  }
+}
diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml
index 2733f59..ce499e7 100644
--- a/flink-walkthroughs/pom.xml
+++ b/flink-walkthroughs/pom.xml
@@ -36,6 +36,8 @@ under the License.
 		<module>flink-walkthrough-common</module>
 		<module>flink-walkthrough-table-java</module>
 		<module>flink-walkthrough-table-scala</module>
+		<module>flink-walkthrough-datastream-java</module>
+		<module>flink-walkthrough-datastream-scala</module>
 	</modules>
 	<build>
 		<extensions>