You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/09/18 08:09:38 UTC
[flink] 01/02: [FLINK-12746][docs] Add DataStream API Walkthrough
This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit df8f9a586143bbd719b6e9f03592e02e45629a9a
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Jul 22 16:01:44 2019 -0500
[FLINK-12746][docs] Add DataStream API Walkthrough
This closes #9201.
---
docs/fig/fraud-transactions.svg | 71 ++
.../getting-started/walkthroughs/datastream_api.md | 925 +++++++++++++++++++++
.../walkthroughs/datastream_api.zh.md | 925 +++++++++++++++++++++
docs/getting-started/walkthroughs/table_api.md | 2 +-
docs/getting-started/walkthroughs/table_api.zh.md | 2 +-
flink-end-to-end-tests/run-nightly-tests.sh | 2 +
flink-end-to-end-tests/test-scripts/common.sh | 12 +
flink-end-to-end-tests/test-scripts/test_cli.sh | 11 -
...throughs.sh => test_datastream_walkthroughs.sh} | 35 +-
.../test-scripts/test_table_walkthroughs.sh | 1 +
.../flink/walkthrough/common/entity/Alert.java | 61 ++
.../flink/walkthrough/common/sink/AlertSink.java | 43 +
.../flink-walkthrough-datastream-java/pom.xml | 37 +
.../META-INF/maven/archetype-metadata.xml | 36 +
.../src/main/resources/archetype-resources/pom.xml | 225 +++++
.../src/main/java/FraudDetectionJob.java | 50 ++
.../src/main/java/FraudDetector.java | 48 ++
.../src/main/resources/log4j.properties | 24 +
.../flink-walkthrough-datastream-scala/pom.xml | 37 +
.../META-INF/maven/archetype-metadata.xml | 36 +
.../src/main/resources/archetype-resources/pom.xml | 256 ++++++
.../src/main/resources/log4j.properties | 24 +
.../src/main/scala/FraudDetectionJob.scala | 51 ++
.../src/main/scala/FraudDetector.scala | 49 ++
flink-walkthroughs/pom.xml | 2 +
25 files changed, 2945 insertions(+), 20 deletions(-)
diff --git a/docs/fig/fraud-transactions.svg b/docs/fig/fraud-transactions.svg
new file mode 100644
index 0000000..f8e59d9
--- /dev/null
+++ b/docs/fig/fraud-transactions.svg
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<svg version="1.1" viewBox="0 0 842.6483154296875 203.27821350097656" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg" width="100%" height="100%">
+ <rect id="svgEditorBackground" x="0" y="0" width="842.648" height="203.278" style="fill:none;stroke:none;" />
+ <clipPath id="p.0">
+ <path d="m0 0l842.6483 0l0 203.27821l-842.6483 0l0 -203.27821z" clip-rule="nonzero" />
+ </clipPath>
+ <g clip-path="url(#p.0)">
+ <path fill="#000000" fill-opacity="0.0" d="M0,0l842.6483,0l0,203.27821l-842.6483,0Z" fill-rule="evenodd" />
+ <path fill="#000000" fill-opacity="0.0" d="m203.24672 147.81758l0 -46.015747l138.67717 0l0 46.015747z" fill-rule="evenodd" />
+ <path fill="#000000" d="M265.14047,136.5832q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21876499999999,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.3593673700000011,-5.265625,-1.3124923999999965q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.93751499999999, [...]
+ <path fill="#000000" d="M608.6628,138.55434q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21875,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.359375,-5.265625,-1.3125q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.9375,0.4375q4.171875,0.703125,6.4375,2.40625q2 [...]
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.850395 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m119.850395 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m195.85039 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m271.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m347.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m423.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m499.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m575.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m651.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m727.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m803.8504 19.053806l0 76.19423" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 19.552494l760.9974 0" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 54.749344l760.9974 0" fill-rule="nonzero" />
+ <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 94.749344l760.9974 0" fill-rule="nonzero" />
+ <path fill="#000000" d="m68.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 0 [...]
+ <path fill="#000000" d="m144.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...]
+ <path fill="#000000" d="m220.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...]
+ <path fill="#000000" d="m296.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...]
+ <path fill="#000000" d="m372.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...]
+ <path fill="#000000" d="m448.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...]
+ <path fill="#000000" d="m524.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...]
+ <path fill="#000000" d="m600.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...]
+ <path fill="#000000" d="m676.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...]
+ <path fill="#000000" d="m748.9306 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 0 [...]
+ <path fill="#000000" d="m64.78021 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+ <path fill="#000000" d="m140.78021 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+ <path fill="#000000" d="m220.48645 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+ <path fill="#000000" d="m289.07397 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+ <path fill="#000000" d="m365.07397 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+ <path fill="#000000" d="m444.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+ <path fill="#000000" d="m524.48645 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...]
+ <path fill="#000000" d="m596.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+ <path fill="#000000" d="m669.074 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.609375 [...]
+ <path fill="#000000" d="m748.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...]
+ <path fill="#000000" fill-opacity="0.0" d="m236.08136 158.07217l73.00787 0l0 68.28346l-73.00787 0z" fill-rule="evenodd" />
+ <path fill="#000000" d="m246.61261 184.99217l0 -13.359375l9.015625 0l0 1.578125l-7.25 0l0 4.140625l6.265625 0l0 1.578125l-6.265625 0l0 6.0625l-1.765625 0zm11.083496 0l0 -9.671875l1.46875 0l0 1.46875q0.5625 -1.03125 1.03125 -1.359375q0.484375 -0.328125 1.0625 -0.328125q0.828125 0 1.6875 0.53125l-0.5625 1.515625q-0.609375 -0.359375 -1.203125 -0.359375q-0.546875 0 -0.96875 0.328125q-0.421875 0.328125 -0.609375 0.890625q-0.28125 0.875 -0.28125 1.921875l0 5.0625l-1.625 0zm12.540802 -1 [...]
+ <path fill="#000000" fill-opacity="0.0" d="m563.58795 160.0433l105.03937 0l0 68.28346l-105.03937 0z" fill-rule="evenodd" />
+ <path fill="#000000" d="m574.0098 186.9633l0 -13.359375l1.8125 0l7.015625 10.484375l0 -10.484375l1.6875 0l0 13.359375l-1.8125 0l-7.015625 -10.5l0 10.5l-1.6875 0zm12.676025 -4.84375q0 -2.6875 1.484375 -3.96875q1.25 -1.078125 3.046875 -1.078125q2.0 0 3.265625 1.3125q1.265625 1.296875 1.265625 3.609375q0 1.859375 -0.5625 2.9375q-0.5625 1.0625 -1.640625 1.65625q-1.0625 0.59375 -2.328125 0.59375q-2.03125 0 -3.28125 -1.296875q-1.25 -1.3125 -1.25 -3.765625zm1.6875 0q0 1.859375 0.796875 [...]
+ </g>
+</svg>
diff --git a/docs/getting-started/walkthroughs/datastream_api.md b/docs/getting-started/walkthroughs/datastream_api.md
new file mode 100644
index 0000000..8676ae4
--- /dev/null
+++ b/docs/getting-started/walkthroughs/datastream_api.md
@@ -0,0 +1,925 @@
+---
+title: "DataStream API"
+nav-id: datastreamwalkthrough
+nav-title: 'DataStream API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a DataStream API for building robust, stateful streaming applications.
+It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems.
+In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building?
+
+Credit card fraud is a growing concern in the digital age.
+Criminals steal credit card numbers by running scams or hacking into insecure systems.
+Stolen numbers are tested by making one or more small purchases, often for a dollar or less.
+If that works, they then make more significant purchases to get items they can sell or keep for themselves.
+
+In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions.
+Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+
+## Help, I’m Stuck!
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
+
+## How to Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8
+* Maven
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic.
+These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough.
+
+{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %}
+ -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+ -DarchetypeVersion={{ site.version }} \
+ -DgroupId=frauddetection \
+ -DartifactId=frauddetection \
+ -Dversion=0.1 \
+ -Dpackage=spendreport \
+ -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %}
+ -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+ -DarchetypeVersion={{ site.version }} \
+ -DgroupId=frauddetection \
+ -DartifactId=frauddetection \
+ -Dversion=0.1 \
+ -Dpackage=spendreport \
+ -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+ <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE.
+Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+#### FraudDetectionJob.java
+
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+public class FraudDetectionJob {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Transaction> transactions = env
+ .addSource(new TransactionSource())
+ .name("transactions");
+
+ DataStream<Alert> alerts = transactions
+ .keyBy(Transaction::getAccountId)
+ .process(new FraudDetector())
+ .name("fraud-detector");
+
+ alerts
+ .addSink(new AlertSink())
+ .name("send-alerts");
+
+ env.execute("Fraud Detection");
+ }
+}
+{% endhighlight %}
+
+#### FraudDetector.java
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final double SMALL_AMOUNT = 1.00;
+ private static final double LARGE_AMOUNT = 500.00;
+ private static final long ONE_MINUTE = 60 * 1000;
+
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+#### FraudDetectionJob.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+object FraudDetectionJob {
+
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val transactions: DataStream[Transaction] = env
+ .addSource(new TransactionSource)
+ .name("transactions")
+
+ val alerts: DataStream[Alert] = transactions
+ .keyBy(transaction => transaction.getAccountId)
+ .process(new FraudDetector)
+ .name("fraud-detector")
+
+ alerts
+ .addSink(new AlertSink)
+ .name("send-alerts")
+
+ env.execute("Fraud Detection")
+ }
+}
+{% endhighlight %}
+
+#### FraudDetector.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+ val SMALL_AMOUNT: Double = 1.00
+ val LARGE_AMOUNT: Double = 500.00
+ val ONE_MINUTE: Long = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @throws[Exception]
+ def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down the Code
+
+Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions.
+
+We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class.
+
+#### The Execution Environment
+
+The first line sets up your `StreamExecutionEnvironment`.
+The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+{% endhighlight %}
+</div>
+</div>
+
+#### Creating a Source
+
+Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.
+This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process.
+Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`).
+The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Transaction> transactions = env
+ .addSource(new TransactionSource())
+ .name("transactions");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val transactions: DataStream[Transaction] = env
+ .addSource(new TransactionSource)
+ .name("transactions")
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Partitioning Events & Detecting Fraud
+
+The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
+
+To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`.
+The `process()` call adds an operator that applies a function to each partitioned element in the stream.
+It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Alert> alerts = transactions
+ .keyBy(Transaction::getAccountId)
+ .process(new FraudDetector())
+ .name("fraud-detector");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val alerts: DataStream[Alert] = transactions
+ .keyBy(transaction => transaction.getAccountId)
+ .process(new FraudDetector)
+ .name("fraud-detector")
+{% endhighlight %}
+</div>
+</div>
+
+#### Outputting Results
+
+A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis.
+The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+alerts.addSink(new AlertSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+alerts.addSink(new AlertSink)
+{% endhighlight %}
+</div>
+</div>
+
+#### Executing the Job
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Fraud Detection");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Fraud Detection")
+{% endhighlight %}
+</div>
+</div>
+
+#### The Fraud Detector
+
+The fraud detector is implemented as a `KeyedProcessFunction`.
+Its method `KeyedProcessFunction#processElement` is called for every transaction event.
+This first version produces an alert on every transaction, which some may say is overly conservative.
+
+The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final double SMALL_AMOUNT = 1.00;
+ private static final double LARGE_AMOUNT = 500.00;
+ private static final long ONE_MINUTE = 60 * 1000;
+
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object FraudDetector {
+ val SMALL_AMOUNT: Double = 1.00
+ val LARGE_AMOUNT: Double = 500.00
+ val ONE_MINUTE: Long = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @throws[Exception]
+ def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Writing a Real Application (v1)
+
+For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
+Imagine your fraud detector processes the following stream of transactions for a particular account.
+
+<p class="text-center">
+ <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/>
+</p>
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510.
+Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html).
+It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is set for that account.
+
+However, merely implementing the flag as a member variable in the `FraudDetector` class will not work.
+Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert.
+We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed.
+In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account.
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data.
+The right hook for this is the `open()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient ValueState<Boolean> flagState;
+
+ @Override
+ public void open(Configuration parameters) {
+ ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+ "flag",
+ Types.BOOLEAN);
+ flagState = getRuntimeContext().getState(flagDescriptor);
+ }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+ @throws[Exception]
+ override def open(parameters: Configuration): Unit = {
+ val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+ flagState = getRuntimeContext.getState(flagDescriptor)
+ }
+{% endhighlight %}
+</div>
+</div>
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ // Get the current state for the current key
+ Boolean lastTransactionWasSmall = flagState.value();
+
+ // Check if the flag is set
+ if (lastTransactionWasSmall != null) {
+ if (transaction.getAmount() > LARGE_AMOUNT) {
+ // Output an alert downstream
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+
+ // Clean up our state
+ flagState.clear();
+ }
+
+ if (transaction.getAmount() < SMALL_AMOUNT) {
+ // Set the flag to true
+ flagState.update(true);
+ }
+ }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+ override def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ // Get the current state for the current key
+ val lastTransactionWasSmall = flagState.value
+
+ // Check if the flag is set
+ if (lastTransactionWasSmall != null) {
+ if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+ // Output an alert downstream
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+ // Clean up our state
+ flagState.clear()
+ }
+
+ if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true)
+ }
+ }
+{% endhighlight %}
+</div>
+</div>
+
+For every transaction, the fraud detector checks the state of the flag for that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not.
+
+## Fraud Detector v2: State + Time = ❤️
+
+Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed.
+For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ private transient ValueState<Boolean> flagState;
+ private transient ValueState<Long> timerState;
+
+ @Override
+ public void open(Configuration parameters) {
+ ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+ "flag",
+ Types.BOOLEAN);
+ flagState = getRuntimeContext().getState(flagDescriptor);
+
+ ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+ "timer-state",
+ Types.LONG);
+ timerState = getRuntimeContext().getState(timerDescriptor);
+ }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @transient private var flagState: ValueState[java.lang.Boolean] = _
+ @transient private var timerState: ValueState[java.lang.Long] = _
+
+ @throws[Exception]
+ override def open(parameters: Configuration): Unit = {
+ val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+ flagState = getRuntimeContext.getState(flagDescriptor)
+
+ val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+ timerState = getRuntimeContext.getState(timerDescriptor)
+ }
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service.
+The timer service can be used to query the current time, register timers, and delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+if (transaction.getAmount() < SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true);
+
+ // set the timer and timer state
+ long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+ context.timerService().registerProcessingTimeTimer(timer);
+ timerState.update(timer);
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true)
+
+ // set the timer and timer state
+ val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+ context.timerService.registerProcessingTimeTimer(timer)
+ timerState.update(timer)
+}
+{% endhighlight %}
+</div>
+</div>
+
+Processing time is wall clock time, and is determined by the system clock of the machine running the operator.
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`.
+Overriding this method is how you can implement your callback to reset the flag.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+ // remove flag after 1 minute
+ timerState.clear();
+ flagState.clear();
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(
+ timestamp: Long,
+ ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+ out: Collector[Alert]): Unit = {
+ // remove flag after 1 minute
+ timerState.clear()
+ flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
+You can wrap this in a helper method and call this method instead of `flagState.clear()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+private void cleanUp(Context ctx) throws Exception {
+ // delete timer
+ Long timer = timerState.value();
+ ctx.timerService().deleteProcessingTimeTimer(timer);
+
+ // clean up all state
+ timerState.clear();
+ flagState.clear();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+ // delete timer
+ val timer = timerState.value
+ ctx.timerService.deleteProcessingTimeTimer(timer)
+
+ // clean up all states
+ timerState.clear()
+ flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final double SMALL_AMOUNT = 1.00;
+ private static final double LARGE_AMOUNT = 500.00;
+ private static final long ONE_MINUTE = 60 * 1000;
+
+ private transient ValueState<Boolean> flagState;
+ private transient ValueState<Long> timerState;
+
+ @Override
+ public void open(Configuration parameters) {
+ ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+ "flag",
+ Types.BOOLEAN);
+ flagState = getRuntimeContext().getState(flagDescriptor);
+
+ ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+ "timer-state",
+ Types.LONG);
+ timerState = getRuntimeContext().getState(timerDescriptor);
+ }
+
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ // Get the current state for the current key
+ Boolean lastTransactionWasSmall = flagState.value();
+
+ // Check if the flag is set
+ if (lastTransactionWasSmall != null) {
+ if (transaction.getAmount() > LARGE_AMOUNT) {
+ //Output an alert downstream
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+ // Clean up our state
+ cleanUp(context);
+ }
+
+ if (transaction.getAmount() < SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true);
+
+ long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+ context.timerService().registerProcessingTimeTimer(timer);
+
+ timerState.update(timer);
+ }
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+ // remove flag after 1 minute
+ timerState.clear();
+ flagState.clear();
+ }
+
+ private void cleanUp(Context ctx) throws Exception {
+ // delete timer
+ Long timer = timerState.value();
+ ctx.timerService().deleteProcessingTimeTimer(timer);
+
+ // clean up all state
+ timerState.clear();
+ flagState.clear();
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+ val SMALL_AMOUNT: Double = 1.00
+ val LARGE_AMOUNT: Double = 500.00
+ val ONE_MINUTE: Long = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @transient private var flagState: ValueState[java.lang.Boolean] = _
+ @transient private var timerState: ValueState[java.lang.Long] = _
+
+ @throws[Exception]
+ override def open(parameters: Configuration): Unit = {
+ val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+ flagState = getRuntimeContext.getState(flagDescriptor)
+
+ val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+ timerState = getRuntimeContext.getState(timerDescriptor)
+ }
+
+ override def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ // Get the current state for the current key
+ val lastTransactionWasSmall = flagState.value
+
+ // Check if the flag is set
+ if (lastTransactionWasSmall != null) {
+ if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+ // Output an alert downstream
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+ // Clean up our state
+ cleanUp(context)
+ }
+
+ if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true)
+ val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+
+ context.timerService.registerProcessingTimeTimer(timer)
+ timerState.update(timer)
+ }
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+ out: Collector[Alert]): Unit = {
+ // remove flag after 1 minute
+ timerState.clear()
+ flagState.clear()
+ }
+
+ @throws[Exception]
+ private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+ // delete timer
+ val timer = timerState.value
+ ctx.timerService.deleteProcessingTimeTimer(timer)
+
+ // clean up all states
+ timerState.clear()
+ flagState.clear()
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Expected Output
+
+Running this code with the provided `TransactionSource` will emit fraud alerts for account 3.
+You should see the following output in your task manager logs:
+
+{% highlight bash %}
+2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+{% endhighlight %}
diff --git a/docs/getting-started/walkthroughs/datastream_api.zh.md b/docs/getting-started/walkthroughs/datastream_api.zh.md
new file mode 100644
index 0000000..8676ae4
--- /dev/null
+++ b/docs/getting-started/walkthroughs/datastream_api.zh.md
@@ -0,0 +1,925 @@
+---
+title: "DataStream API"
+nav-id: datastreamwalkthrough
+nav-title: 'DataStream API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink offers a DataStream API for building robust, stateful streaming applications.
+It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems.
+In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building?
+
+Credit card fraud is a growing concern in the digital age.
+Criminals steal credit card numbers by running scams or hacking into insecure systems.
+Stolen numbers are tested by making one or more small purchases, often for a dollar or less.
+If that works, they then make more significant purchases to get items they can sell or keep for themselves.
+
+In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions.
+Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+
+## Help, I’m Stuck!
+
+If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly.
+
+## How to Follow Along
+
+If you want to follow along, you will require a computer with:
+
+* Java 8
+* Maven
+
+A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic.
+These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough.
+
+{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %}
+ -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+ -DarchetypeVersion={{ site.version }} \
+ -DgroupId=frauddetection \
+ -DartifactId=frauddetection \
+ -Dversion=0.1 \
+ -Dpackage=spendreport \
+ -DinteractiveMode=false
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+$ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %}
+ -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %}
+ -DarchetypeVersion={{ site.version }} \
+ -DgroupId=frauddetection \
+ -DartifactId=frauddetection \
+ -Dversion=0.1 \
+ -Dpackage=spendreport \
+ -DinteractiveMode=false
+{% endhighlight %}
+</div>
+</div>
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+ <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a>
+</p>
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
+Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial.
+After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE.
+Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+#### FraudDetectionJob.java
+
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+public class FraudDetectionJob {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Transaction> transactions = env
+ .addSource(new TransactionSource())
+ .name("transactions");
+
+ DataStream<Alert> alerts = transactions
+ .keyBy(Transaction::getAccountId)
+ .process(new FraudDetector())
+ .name("fraud-detector");
+
+ alerts
+ .addSink(new AlertSink())
+ .name("send-alerts");
+
+ env.execute("Fraud Detection");
+ }
+}
+{% endhighlight %}
+
+#### FraudDetector.java
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final double SMALL_AMOUNT = 1.00;
+ private static final double LARGE_AMOUNT = 500.00;
+ private static final long ONE_MINUTE = 60 * 1000;
+
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+#### FraudDetectionJob.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+object FraudDetectionJob {
+
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val transactions: DataStream[Transaction] = env
+ .addSource(new TransactionSource)
+ .name("transactions")
+
+ val alerts: DataStream[Alert] = transactions
+ .keyBy(transaction => transaction.getAccountId)
+ .process(new FraudDetector)
+ .name("fraud-detector")
+
+ alerts
+ .addSink(new AlertSink)
+ .name("send-alerts")
+
+ env.execute("Fraud Detection")
+ }
+}
+{% endhighlight %}
+
+#### FraudDetector.scala
+
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+ val SMALL_AMOUNT: Double = 1.00
+ val LARGE_AMOUNT: Double = 500.00
+ val ONE_MINUTE: Long = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @throws[Exception]
+ def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+## Breaking Down the Code
+
+Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions.
+
+We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class.
+
+#### The Execution Environment
+
+The first line sets up your `StreamExecutionEnvironment`.
+The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+{% endhighlight %}
+</div>
+</div>
+
+#### Creating a Source
+
+Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.
+This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process.
+Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`).
+The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Transaction> transactions = env
+ .addSource(new TransactionSource())
+ .name("transactions");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val transactions: DataStream[Transaction] = env
+ .addSource(new TransactionSource)
+ .name("transactions")
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Partitioning Events & Detecting Fraud
+
+The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
+
+To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`.
+The `process()` call adds an operator that applies a function to each partitioned element in the stream.
+It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Alert> alerts = transactions
+ .keyBy(Transaction::getAccountId)
+ .process(new FraudDetector())
+ .name("fraud-detector");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val alerts: DataStream[Alert] = transactions
+ .keyBy(transaction => transaction.getAccountId)
+ .process(new FraudDetector)
+ .name("fraud-detector")
+{% endhighlight %}
+</div>
+</div>
+
+#### Outputting Results
+
+A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis.
+The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+alerts.addSink(new AlertSink());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+alerts.addSink(new AlertSink)
+{% endhighlight %}
+</div>
+</div>
+
+#### Executing the Job
+
+Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
+Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.execute("Fraud Detection");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.execute("Fraud Detection")
+{% endhighlight %}
+</div>
+</div>
+
+#### The Fraud Detector
+
+The fraud detector is implemented as a `KeyedProcessFunction`.
+Its method `KeyedProcessFunction#processElement` is called for every transaction event.
+This first version produces an alert on every transaction, which some may say is overly conservative.
+
+The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final double SMALL_AMOUNT = 1.00;
+ private static final double LARGE_AMOUNT = 500.00;
+ private static final long ONE_MINUTE = 60 * 1000;
+
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object FraudDetector {
+ val SMALL_AMOUNT: Double = 1.00
+ val LARGE_AMOUNT: Double = 500.00
+ val ONE_MINUTE: Long = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @throws[Exception]
+ def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Writing a Real Application (v1)
+
+For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
+Imagine your fraud detector processes the following stream of transactions for a particular account.
+
+<p class="text-center">
+ <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/>
+</p>
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510.
+Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html).
+It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is set for that account.
+
+However, merely implementing the flag as a member variable in the `FraudDetector` class will not work.
+Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert.
+We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed.
+In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account.
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data.
+The right hook for this is the `open()` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient ValueState<Boolean> flagState;
+
+ @Override
+ public void open(Configuration parameters) {
+ ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+ "flag",
+ Types.BOOLEAN);
+ flagState = getRuntimeContext().getState(flagDescriptor);
+ }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+ @throws[Exception]
+ override def open(parameters: Configuration): Unit = {
+ val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+ flagState = getRuntimeContext.getState(flagDescriptor)
+ }
+{% endhighlight %}
+</div>
+</div>
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ // Get the current state for the current key
+ Boolean lastTransactionWasSmall = flagState.value();
+
+ // Check if the flag is set
+ if (lastTransactionWasSmall != null) {
+ if (transaction.getAmount() > LARGE_AMOUNT) {
+ // Output an alert downstream
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+
+ // Clean up our state
+ flagState.clear();
+ }
+
+ if (transaction.getAmount() < SMALL_AMOUNT) {
+ // Set the flag to true
+ flagState.update(true);
+ }
+ }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+ override def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ // Get the current state for the current key
+ val lastTransactionWasSmall = flagState.value
+
+ // Check if the flag is set
+ if (lastTransactionWasSmall != null) {
+ if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+ // Output an alert downstream
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+ // Clean up our state
+ flagState.clear()
+ }
+
+ if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true)
+ }
+ }
+{% endhighlight %}
+</div>
+</div>
+
+For every transaction, the fraud detector checks the state of the flag for that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not.
+
+## Fraud Detector v2: State + Time = ❤️
+
+Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed.
+For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ private transient ValueState<Boolean> flagState;
+ private transient ValueState<Long> timerState;
+
+ @Override
+ public void open(Configuration parameters) {
+ ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+ "flag",
+ Types.BOOLEAN);
+ flagState = getRuntimeContext().getState(flagDescriptor);
+
+ ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+ "timer-state",
+ Types.LONG);
+ timerState = getRuntimeContext().getState(timerDescriptor);
+ }
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @transient private var flagState: ValueState[java.lang.Boolean] = _
+ @transient private var timerState: ValueState[java.lang.Long] = _
+
+ @throws[Exception]
+ override def open(parameters: Configuration): Unit = {
+ val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+ flagState = getRuntimeContext.getState(flagDescriptor)
+
+ val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+ timerState = getRuntimeContext.getState(timerDescriptor)
+ }
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service.
+The timer service can be used to query the current time, register timers, and delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+if (transaction.getAmount() < SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true);
+
+ // set the timer and timer state
+ long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+ context.timerService().registerProcessingTimeTimer(timer);
+ timerState.update(timer);
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true)
+
+ // set the timer and timer state
+ val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+ context.timerService.registerProcessingTimeTimer(timer)
+ timerState.update(timer)
+}
+{% endhighlight %}
+</div>
+</div>
+
+Processing time is wall clock time, and is determined by the system clock of the machine running the operator.
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`.
+Overriding this method is how you can implement your callback to reset the flag.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+ // remove flag after 1 minute
+ timerState.clear();
+ flagState.clear();
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(
+ timestamp: Long,
+ ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+ out: Collector[Alert]): Unit = {
+ // remove flag after 1 minute
+ timerState.clear()
+ flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
+You can wrap this in a helper method and call this method instead of `flagState.clear()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+private void cleanUp(Context ctx) throws Exception {
+ // delete timer
+ Long timer = timerState.value();
+ ctx.timerService().deleteProcessingTimeTimer(timer);
+
+ // clean up all state
+ timerState.clear();
+ flagState.clear();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+ // delete timer
+ val timer = timerState.value
+ ctx.timerService.deleteProcessingTimeTimer(timer)
+
+ // clean up all states
+ timerState.clear()
+ flagState.clear()
+}
+{% endhighlight %}
+</div>
+</div>
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+package spendreport;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final double SMALL_AMOUNT = 1.00;
+ private static final double LARGE_AMOUNT = 500.00;
+ private static final long ONE_MINUTE = 60 * 1000;
+
+ private transient ValueState<Boolean> flagState;
+ private transient ValueState<Long> timerState;
+
+ @Override
+ public void open(Configuration parameters) {
+ ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+ "flag",
+ Types.BOOLEAN);
+ flagState = getRuntimeContext().getState(flagDescriptor);
+
+ ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+ "timer-state",
+ Types.LONG);
+ timerState = getRuntimeContext().getState(timerDescriptor);
+ }
+
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ // Get the current state for the current key
+ Boolean lastTransactionWasSmall = flagState.value();
+
+ // Check if the flag is set
+ if (lastTransactionWasSmall != null) {
+ if (transaction.getAmount() > LARGE_AMOUNT) {
+ //Output an alert downstream
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+ // Clean up our state
+ cleanUp(context);
+ }
+
+ if (transaction.getAmount() < SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true);
+
+ long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+ context.timerService().registerProcessingTimeTimer(timer);
+
+ timerState.update(timer);
+ }
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+ // remove flag after 1 minute
+ timerState.clear();
+ flagState.clear();
+ }
+
+ private void cleanUp(Context ctx) throws Exception {
+ // delete timer
+ Long timer = timerState.value();
+ ctx.timerService().deleteProcessingTimeTimer(timer);
+
+ // clean up all state
+ timerState.clear();
+ flagState.clear();
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+package spendreport
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+ val SMALL_AMOUNT: Double = 1.00
+ val LARGE_AMOUNT: Double = 500.00
+ val ONE_MINUTE: Long = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @transient private var flagState: ValueState[java.lang.Boolean] = _
+ @transient private var timerState: ValueState[java.lang.Long] = _
+
+ @throws[Exception]
+ override def open(parameters: Configuration): Unit = {
+ val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+ flagState = getRuntimeContext.getState(flagDescriptor)
+
+ val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+ timerState = getRuntimeContext.getState(timerDescriptor)
+ }
+
+ override def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ // Get the current state for the current key
+ val lastTransactionWasSmall = flagState.value
+
+ // Check if the flag is set
+ if (lastTransactionWasSmall != null) {
+ if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+ // Output an alert downstream
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+ // Clean up our state
+ cleanUp(context)
+ }
+
+ if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+ // set the flag to true
+ flagState.update(true)
+ val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+
+ context.timerService.registerProcessingTimeTimer(timer)
+ timerState.update(timer)
+ }
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+ out: Collector[Alert]): Unit = {
+ // remove flag after 1 minute
+ timerState.clear()
+ flagState.clear()
+ }
+
+ @throws[Exception]
+ private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+ // delete timer
+ val timer = timerState.value
+ ctx.timerService.deleteProcessingTimeTimer(timer)
+
+ // clean up all states
+ timerState.clear()
+ flagState.clear()
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Expected Output
+
+Running this code with the provided `TransactionSource` will emit fraud alerts for account 3.
+You should see the following output in your task manager logs:
+
+{% highlight bash %}
+2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
+{% endhighlight %}
diff --git a/docs/getting-started/walkthroughs/table_api.md b/docs/getting-started/walkthroughs/table_api.md
index 878fb54..f0cbb62 100644
--- a/docs/getting-started/walkthroughs/table_api.md
+++ b/docs/getting-started/walkthroughs/table_api.md
@@ -3,7 +3,7 @@ title: "Table API"
nav-id: tableapiwalkthrough
nav-title: 'Table API'
nav-parent_id: walkthroughs
-nav-pos: 1
+nav-pos: 2
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/getting-started/walkthroughs/table_api.zh.md b/docs/getting-started/walkthroughs/table_api.zh.md
index 878fb54..f0cbb62 100644
--- a/docs/getting-started/walkthroughs/table_api.zh.md
+++ b/docs/getting-started/walkthroughs/table_api.zh.md
@@ -3,7 +3,7 @@ title: "Table API"
nav-id: tableapiwalkthrough
nav-title: 'Table API'
nav-parent_id: walkthroughs
-nav-pos: 1
+nav-pos: 2
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 3b34b560..9307188 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -140,6 +140,8 @@ run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scrip
run_test "Walkthrough Table Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh java"
run_test "Walkthrough Table Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh scala"
+run_test "Walkthrough DataStream Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh java"
+run_test "Walkthrough DataStream Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh scala"
run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_gcp_pubsub.sh"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 2dc3787..bdecb32 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -745,3 +745,15 @@ function retry_times() {
return 1
}
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+
+function extract_job_id_from_job_submission_return() {
+ if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+ then
+ JOB_ID="${BASH_REMATCH[1]}";
+ else
+ JOB_ID=""
+ fi
+ echo "$JOB_ID"
+}
+
diff --git a/flink-end-to-end-tests/test-scripts/test_cli.sh b/flink-end-to-end-tests/test-scripts/test_cli.sh
index b9d285b..4b04890 100755
--- a/flink-end-to-end-tests/test-scripts/test_cli.sh
+++ b/flink-end-to-end-tests/test-scripts/test_cli.sh
@@ -29,23 +29,12 @@ $FLINK_DIR/bin/taskmanager.sh start
$FLINK_DIR/bin/taskmanager.sh start
# CLI regular expressions
-JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\""
JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\""
JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :"
EXIT_CODE=0
-function extract_job_id_from_job_submission_return() {
- if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
- then
- JOB_ID="${BASH_REMATCH[1]}";
- else
- JOB_ID=""
- fi
- echo "$JOB_ID"
-}
-
function extract_valid_pact_from_job_info_return() {
PACT_MATCH=0
if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]];
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
similarity index 75%
copy from flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
copy to flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
index 77afc58..e976927 100755
--- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
+++ b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh
@@ -19,7 +19,7 @@
# End to end test for quick starts test.
# Usage:
-# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh <Type (java or scala)>
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh <Type (java or scala)>
source "$(dirname "$0")"/common.sh
@@ -28,12 +28,12 @@ TEST_TYPE=$1
mkdir -p "${TEST_DATA_DIR}"
cd "${TEST_DATA_DIR}"
-ARTIFACT_ID=flink-walkthrough-table-${TEST_TYPE}
+ARTIFACT_ID=flink-walkthrough-datastream-${TEST_TYPE}
ARTIFACT_VERSION=0.1
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
- -DarchetypeArtifactId=flink-walkthrough-table-${TEST_TYPE} \
+ -DarchetypeArtifactId=flink-walkthrough-datastream-${TEST_TYPE} \
-DarchetypeVersion=${FLINK_VERSION} \
-DgroupId=org.apache.flink.walkthrough \
-DartifactId=${ARTIFACT_ID} \
@@ -46,7 +46,8 @@ cd "${ARTIFACT_ID}"
mvn clean package -nsu > compile-output.txt
if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
- echo "Failure: The walk-through did not successfully compile"
+ echo "Failure: The walkthrough did not successfully compile"
+ cat compile-output.txt
exit 1
fi
@@ -67,8 +68,28 @@ fi
TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar
-add_optional_lib "table"
-
start_cluster
-${FLINK_DIR}/bin/flink run "$TEST_PROGRAM_JAR"
+JOB_ID=""
+EXIT_CODE=0
+
+RETURN=`$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR`
+echo "$RETURN"
+JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"`
+EXIT_CODE=$? # expect matching job id extraction
+
+if [ $EXIT_CODE == 0 ]; then
+ RETURN=`$FLINK_DIR/bin/flink list -r`
+ echo "$RETURN"
+ if [[ `grep -c "$JOB_ID" "$RETURN"` -eq '1' ]]; then # expect match for running job
+ echo "[FAIL] Unable to submit walkthrough"
+ EXIT_CODE=1
+ fi
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+ eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}"
+ EXIT_CODE=$?
+fi
+
+exit $EXIT_CODE
diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
index 77afc58..f53e972 100755
--- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
+++ b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh
@@ -47,6 +47,7 @@ mvn clean package -nsu > compile-output.txt
if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then
echo "Failure: The walk-through did not successfully compile"
+ cat compile-output.txt
exit 1
fi
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
new file mode 100644
index 0000000..9678fdb
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.entity;
+
+import java.util.Objects;
+
+/**
+ * A simple alert event.
+ */
+@SuppressWarnings("unused")
+public final class Alert {
+
+ private long id;
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Alert alert = (Alert) o;
+ return id == alert.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public String toString() {
+ return "Alert{" +
+ "id=" + id +
+ '}';
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
new file mode 100644
index 0000000..4332a8e
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.walkthrough.common.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.walkthrough.common.entity.Alert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A sink for outputting alerts.
+ */
+@PublicEvolving
+@SuppressWarnings("unused")
+public class AlertSink implements SinkFunction<Alert> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
+
+ @Override
+ public void invoke(Alert value, Context context) {
+ LOG.info(value.toString());
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml
new file mode 100644
index 0000000..8788a0f
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthroughs</artifactId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-walkthrough-datastream-java</artifactId>
+ <packaging>maven-archetype</packaging>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..ed235f5
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+ xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+ name="flink-walkthrough-datastream-java">
+ <fileSets>
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/main/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </fileSet>
+ <fileSet encoding="UTF-8">
+ <directory>src/main/resources</directory>
+ </fileSet>
+ </fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..8229d78
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,225 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+ <packaging>jar</packaging>
+
+ <name>Flink Walkthrough DataStream Java</name>
+ <url>https://flink.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>@project.version@</flink.version>
+ <java.version>1.8</java.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <maven.compiler.source>${java.version}</maven.compiler.source>
+ <maven.compiler.target>${java.version}</maven.compiler.target>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- This dependency is provided, because it should not be packaged into the JAR file. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+ <!-- Example:
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ -->
+
+ <!-- Add logging framework, to produce console output when running in the IDE. -->
+ <!-- These dependencies are excluded from the application JAR by default. -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.7</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Java Compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+ <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>${package}.FraudDetectionJob</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+
+ <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <versionRange>[3.0.0,)</versionRange>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.1,)</versionRange>
+ <goals>
+ <goal>testCompile</goal>
+ <goal>compile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <!-- This profile helps to make things run out of the box in IntelliJ -->
+ <!-- Its adds Flink's core classes to the runtime class path. -->
+ <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+ <profiles>
+ <profile>
+ <id>add-dependencies-for-IDEA</id>
+
+ <activation>
+ <property>
+ <name>idea.version</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
new file mode 100644
index 0000000..46019fd
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.walkthrough.common.sink.AlertSink;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+import org.apache.flink.walkthrough.common.source.TransactionSource;
+
+/**
+ * Skeleton code for the datastream walkthrough
+ */
+public class FraudDetectionJob {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Transaction> transactions = env
+ .addSource(new TransactionSource())
+ .name("transactions");
+
+ DataStream<Alert> alerts = transactions
+ .keyBy(Transaction::getAccountId)
+ .process(new FraudDetector())
+ .name("fraud-detector");
+
+ alerts
+ .addSink(new AlertSink())
+ .name("send-alerts");
+
+ env.execute("Fraud Detection");
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
new file mode 100644
index 0000000..e5c034c
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+/**
+ * Skeleton code for implementing a fraud detector.
+ */
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final double SMALL_AMOUNT = 1.00;
+ private static final double LARGE_AMOUNT = 500.00;
+ private static final long ONE_MINUTE = 60 * 1000;
+
+ @Override
+ public void processElement(
+ Transaction transaction,
+ Context context,
+ Collector<Alert> collector) throws Exception {
+
+ Alert alert = new Alert();
+ alert.setId(transaction.getAccountId());
+
+ collector.collect(alert);
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8be9b9a
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
new file mode 100644
index 0000000..3e35e27
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthroughs</artifactId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-walkthrough-datastream-scala</artifactId>
+ <packaging>maven-archetype</packaging>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..e78dc8d
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+ xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+ name="flink-walkthrough-datastream-scala">
+ <fileSets>
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/main/scala</directory>
+ <includes>
+ <include>**/*.scala</include>
+ </includes>
+ </fileSet>
+ <fileSet encoding="UTF-8">
+ <directory>src/main/resources</directory>
+ </fileSet>
+ </fileSets>
+</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..93956fa
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,256 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+ <packaging>jar</packaging>
+
+ <name>Flink Walkthrough DataStram Scala</name>
+ <url>https://flink.apache.org</url>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>@project.version@</flink.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.12</scala.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- Apache Flink dependencies -->
+ <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+ <!-- Example:
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ -->
+
+ <!-- Add logging framework, to produce console output when running in the IDE. -->
+ <!-- These dependencies are excluded from the application JAR by default. -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.7</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+ <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>${package}.FraudDetectionJob</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Java Compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Eclipse Scala Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <!-- This profile helps to make things run out of the box in IntelliJ -->
+ <!-- Its adds Flink's core classes to the runtime class path. -->
+ <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
+ <profiles>
+ <profile>
+ <id>add-dependencies-for-IDEA</id>
+
+ <activation>
+ <property>
+ <name>idea.version</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8be9b9a
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=Warn, console
+log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
new file mode 100644
index 0000000..58e46e2
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.walkthrough.common.sink.AlertSink
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+import org.apache.flink.walkthrough.common.source.TransactionSource
+
+/**
+ * Skeleton code for the DataStream code walkthrough
+ */
+object FraudDetectionJob {
+
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val transactions: DataStream[Transaction] = env
+ .addSource(new TransactionSource)
+ .name("transactions")
+
+ val alerts: DataStream[Alert] = transactions
+ .keyBy(transaction => transaction.getAccountId)
+ .process(new FraudDetector)
+ .name("fraud-detector")
+
+ alerts
+ .addSink(new AlertSink)
+ .name("send-alerts")
+
+ env.execute("Fraud Detection")
+ }
+}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
new file mode 100644
index 0000000..6d7d91d
--- /dev/null
+++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}
+
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+/**
+ * Skeleton code for implementing a fraud detector.
+ */
+object FraudDetector {
+ val SMALL_AMOUNT: Double = 1.00
+ val LARGE_AMOUNT: Double = 500.00
+ val ONE_MINUTE: Long = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+ @throws[Exception]
+ def processElement(
+ transaction: Transaction,
+ context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+ collector: Collector[Alert]): Unit = {
+
+ val alert = new Alert
+ alert.setId(transaction.getAccountId)
+
+ collector.collect(alert)
+ }
+}
diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml
index 2733f59..ce499e7 100644
--- a/flink-walkthroughs/pom.xml
+++ b/flink-walkthroughs/pom.xml
@@ -36,6 +36,8 @@ under the License.
<module>flink-walkthrough-common</module>
<module>flink-walkthrough-table-java</module>
<module>flink-walkthrough-table-scala</module>
+ <module>flink-walkthrough-datastream-java</module>
+ <module>flink-walkthrough-datastream-scala</module>
</modules>
<build>
<extensions>