You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2020/07/30 14:44:53 UTC

[flink-web] branch asf-site updated (734f4a1 -> 008e907)

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


    from 734f4a1  Rebuild website
     new 707dd32  Add Blogbost: Advanced Flink Application Patterns: Custom Window Processing
     new 008e907  Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 _posts/2020-07-30-demo-fraud-detection-3.md        |  660 ++++++++++++
 content/blog/feed.xml                              | 1115 ++++++++++++++++----
 content/blog/index.html                            |   38 +-
 content/blog/page10/index.html                     |   38 +-
 content/blog/page11/index.html                     |   44 +-
 content/blog/page12/index.html                     |   45 +-
 content/blog/page13/index.html                     |   25 +
 content/blog/page2/index.html                      |   40 +-
 content/blog/page3/index.html                      |   38 +-
 content/blog/page4/index.html                      |   36 +-
 content/blog/page5/index.html                      |   38 +-
 content/blog/page6/index.html                      |   40 +-
 content/blog/page7/index.html                      |   38 +-
 content/blog/page8/index.html                      |   36 +-
 content/blog/page9/index.html                      |   37 +-
 .../img/blog/patterns-blog-3/evaluation-delays.png |  Bin 0 -> 29120 bytes
 .../blog/patterns-blog-3/keyed-state-scoping.png   |  Bin 0 -> 199113 bytes
 content/img/blog/patterns-blog-3/late-events.png   |  Bin 0 -> 20483 bytes
 .../img/blog/patterns-blog-3/pre-aggregation.png   |  Bin 0 -> 33817 bytes
 .../patterns-blog-3/sample-rule-definition.png     |  Bin 0 -> 98413 bytes
 content/img/blog/patterns-blog-3/time-windows.png  |  Bin 0 -> 37632 bytes
 content/img/blog/patterns-blog-3/type-kryo.png     |  Bin 0 -> 28294 bytes
 content/img/blog/patterns-blog-3/type-pojo.png     |  Bin 0 -> 34853 bytes
 content/img/blog/patterns-blog-3/widest-window.png |  Bin 0 -> 90233 bytes
 .../img/blog/patterns-blog-3/window-clean-up.png   |  Bin 0 -> 15498 bytes
 content/index.html                                 |   13 +-
 .../news/2020/07/30/demo-fraud-detection-3.html    |  908 ++++++++++++++++
 content/zh/index.html                              |   13 +-
 img/blog/patterns-blog-3/evaluation-delays.png     |  Bin 0 -> 29120 bytes
 img/blog/patterns-blog-3/keyed-state-scoping.png   |  Bin 0 -> 199113 bytes
 img/blog/patterns-blog-3/late-events.png           |  Bin 0 -> 20483 bytes
 img/blog/patterns-blog-3/pre-aggregation.png       |  Bin 0 -> 33817 bytes
 .../patterns-blog-3/sample-rule-definition.png     |  Bin 0 -> 98413 bytes
 img/blog/patterns-blog-3/time-windows.png          |  Bin 0 -> 37632 bytes
 img/blog/patterns-blog-3/type-kryo.png             |  Bin 0 -> 28294 bytes
 img/blog/patterns-blog-3/type-pojo.png             |  Bin 0 -> 34853 bytes
 img/blog/patterns-blog-3/widest-window.png         |  Bin 0 -> 90233 bytes
 img/blog/patterns-blog-3/window-clean-up.png       |  Bin 0 -> 15498 bytes
 38 files changed, 2791 insertions(+), 411 deletions(-)
 create mode 100644 _posts/2020-07-30-demo-fraud-detection-3.md
 create mode 100644 content/img/blog/patterns-blog-3/evaluation-delays.png
 create mode 100644 content/img/blog/patterns-blog-3/keyed-state-scoping.png
 create mode 100644 content/img/blog/patterns-blog-3/late-events.png
 create mode 100644 content/img/blog/patterns-blog-3/pre-aggregation.png
 create mode 100644 content/img/blog/patterns-blog-3/sample-rule-definition.png
 create mode 100644 content/img/blog/patterns-blog-3/time-windows.png
 create mode 100644 content/img/blog/patterns-blog-3/type-kryo.png
 create mode 100644 content/img/blog/patterns-blog-3/type-pojo.png
 create mode 100644 content/img/blog/patterns-blog-3/widest-window.png
 create mode 100644 content/img/blog/patterns-blog-3/window-clean-up.png
 create mode 100644 content/news/2020/07/30/demo-fraud-detection-3.html
 create mode 100644 img/blog/patterns-blog-3/evaluation-delays.png
 create mode 100644 img/blog/patterns-blog-3/keyed-state-scoping.png
 create mode 100644 img/blog/patterns-blog-3/late-events.png
 create mode 100644 img/blog/patterns-blog-3/pre-aggregation.png
 create mode 100644 img/blog/patterns-blog-3/sample-rule-definition.png
 create mode 100644 img/blog/patterns-blog-3/time-windows.png
 create mode 100644 img/blog/patterns-blog-3/type-kryo.png
 create mode 100644 img/blog/patterns-blog-3/type-pojo.png
 create mode 100644 img/blog/patterns-blog-3/widest-window.png
 create mode 100644 img/blog/patterns-blog-3/window-clean-up.png


[flink-web] 01/02: Add Blogbost: Advanced Flink Application Patterns: Custom Window Processing

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 707dd32128cf5a10c9ffda807db4e34694c96190
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Wed Jul 22 14:01:40 2020 +0200

    Add Blogbost: Advanced Flink Application Patterns: Custom Window Processing
    
    This closes #362.
---
 _posts/2020-07-30-demo-fraud-detection-3.md        | 660 +++++++++++++++++++++
 img/blog/patterns-blog-3/evaluation-delays.png     | Bin 0 -> 29120 bytes
 img/blog/patterns-blog-3/keyed-state-scoping.png   | Bin 0 -> 199113 bytes
 img/blog/patterns-blog-3/late-events.png           | Bin 0 -> 20483 bytes
 img/blog/patterns-blog-3/pre-aggregation.png       | Bin 0 -> 33817 bytes
 .../patterns-blog-3/sample-rule-definition.png     | Bin 0 -> 98413 bytes
 img/blog/patterns-blog-3/time-windows.png          | Bin 0 -> 37632 bytes
 img/blog/patterns-blog-3/type-kryo.png             | Bin 0 -> 28294 bytes
 img/blog/patterns-blog-3/type-pojo.png             | Bin 0 -> 34853 bytes
 img/blog/patterns-blog-3/widest-window.png         | Bin 0 -> 90233 bytes
 img/blog/patterns-blog-3/window-clean-up.png       | Bin 0 -> 15498 bytes
 11 files changed, 660 insertions(+)

diff --git a/_posts/2020-07-30-demo-fraud-detection-3.md b/_posts/2020-07-30-demo-fraud-detection-3.md
new file mode 100644
index 0000000..a96ab03
--- /dev/null
+++ b/_posts/2020-07-30-demo-fraud-detection-3.md
@@ -0,0 +1,660 @@
+---
+layout: post
+title: "Advanced Flink Application Patterns Vol.3: Custom Window Processing"
+date: 2020-07-30T12:00:00.000Z
+authors:
+- alex:
+  name: "Alexander Fedulov"
+  twitter: "alex_fedulov"
+categories: news
+excerpt: In this series of blog posts you will learn about powerful Flink patterns for building streaming applications.
+---
+
+<style type="text/css">
+.tg  {border-collapse:collapse;border-spacing:0;}
+.tg td{padding:10px 10px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;}
+.tg th{padding:10px 10px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;background-color:#eff0f1;}
+.tg .tg-wide{padding:10px 30px;}
+.tg .tg-top{vertical-align:top}
+.tg .tg-topcenter{text-align:center;vertical-align:top}
+.tg .tg-center{text-align:center;vertical-align:center}
+</style>
+
+## Introduction
+
+In the previous articles of the series, we described how you can achieve
+flexible stream partitioning based on dynamically-updated configurations
+(a set of fraud-detection rules) and how you can utilize Flink\'s
+Broadcast mechanism to distribute processing configuration at runtime
+among the relevant operators. 
+
+Following up directly where we left the discussion of the end-to-end
+solution last time, in this article we will describe how you can use the
+\"Swiss knife\" of Flink - the [*Process Function*](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html) to create an
+implementation that is tailor-made to match your streaming business
+logic requirements. Our discussion will continue in the context of the
+[Fraud Detection engine]({{ site.baseurl }}/news/2020/01/15/demo-fraud-detection.html#fraud-detection-demo). We will also demonstrate how you can
+implement your own **custom replacement for time windows** for cases
+where the out-of-the-box windowing available from the DataStream API
+does not satisfy your requirements. In particular, we will look at the
+trade-offs that you can make when designing a solution which requires
+low-latency reactions to individual events.
+
+This article will describe some high-level concepts that can be applied
+independently, but it is recommended that you review the material in
+[part one]({{ site.baseurl }}/news/2020/01/15/demo-fraud-detection.html) and
+[part two]({{ site.baseurl }}/news/2020/03/24/demo-fraud-detection-2.html) of the series as well as checkout the [code
+base](https://github.com/afedulov/fraud-detection-demo) in order to make
+it easier to follow along.
+
+## ProcessFunction as a "Window"
+
+### Low Latency
+
+Let's start with a reminder of the type of fraud detection rule that we
+would like to support:
+
+*"Whenever the **sum** of  **payments** from the same **payer** to the
+same **beneficiary** within **a 24 hour
+period** is **greater** than **200 000 \$** - trigger an alert."*
+
+In other words, given a stream of transactions partitioned by a key that
+combines the payer and the beneficiary fields, we would like to look
+back in time and determine, for each incoming transaction, if the sum of
+all previous payments between the two specific participants exceeds the
+defined threshold. In effect, the computation window is always moved
+along to the position of the last observed event for a particular data
+partitioning key.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/patterns-blog-3/time-windows.png" width="600px" alt="Figure 1: Time Windows"/>
+<br/>
+<i><small>Figure 1: Time Windows</small></i>
+</center>
+<br/>
+
+
+One of the common key requirements for a fraud detection system is *low
+response time*. The sooner the fraudulent action gets detected, the
+higher the chances that it can be blocked and its negative consequences
+mitigated. This requirement is especially prominent in the financial
+domain, where you have one important constraint - any time spent
+evaluating a fraud detection model is time that a law-abiding user of
+your system will spend waiting for a response. Swiftness of processing
+often becomes a competitive advantage between various payment systems
+and the time limit for producing an alert could lie as low as *300-500
+ms*. This is all the time you get from the moment of ingestion of a
+transaction event into a fraud detection system until an alert has to
+become available to downstream systems. 
+
+As you might know, Flink provides a powerful [Window
+API](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html)
+that is applicable for a wide range of use cases. However, if you go
+over all of the available types of supported windows, you will realize
+that none of them exactly match our main requirement for this use case -
+the low-latency evaluation of *each* incoming transaction. There is
+no type of window in Flink that can express the *"x minutes/hours/days
+back from the <u>current event</u>"* semantic. In the Window API, events
+fall into windows (as defined by the window
+[assigners](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#window-assigners)),
+but they cannot themselves individually control the creation and
+evaluation of windows\*. As described above, our goal for the fraud
+detection engine is to achieve immediate evaluation of the previous
+relevant data points as soon as the new event is received. This raises
+the question of feasibility of applying the Window API in this case. The Window API offers some options for defining custom triggers, evictors, and window assigners, which may get to the required result. However, it is usually difficult to get this right (and easy to break). Moreover, this approach does not provide access to broadcast state, which is required for implementing dynamic reconfiguration of business rules.
+
+\*) apart from the session windows, but they are limited to assignments
+based on the session [gaps](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#session-windows)
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/patterns-blog-3/evaluation-delays.png" width="600px" alt="Figure 2: Evaluation Delays"/>
+<br/>
+<i><small>Figure 2: Evaluation Delays</small></i>
+</center>
+<br/>
+
+Let's take an example of using a [sliding
+window](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#sliding-windows)
+from Flink's Window API. Using sliding windows with the slide of *S*
+translates into an expected value of evaluation delay equal to *S/2.*
+This means that you would need to define a window slide of 600-1000 ms
+to fulfill the low-latency requirement of 300-500 ms delay, even before
+taking any actual computation time into account. The fact that Flink
+stores a separate window state for each sliding window pane renders this
+approach unfeasible under any moderately high load conditions.
+
+In order to satisfy the requirements, we need to create our own
+low-latency window implementation. Luckily, Flink gives us all the tools
+required to do so. `ProcessFunction` is a low-level, but powerful
+building block in Flink\'s API. It has a simple contract:
+
+```java
+public class SomeProcessFunction extends KeyedProcessFunction<KeyType, InputType, OutputType> {
+
+	public void processElement(InputType event, Context ctx, Collector<OutputType> out){}
+
+	public void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputType> out) {}
+
+	public void open(Configuration parameters){}
+}
+```
+
+-   `processElement()` receives input events one by one. You can react to
+    each input by producing one or more output events to the next
+    operator by calling `out.collect(someOutput)`. You can also pass data
+    to a [side
+    output](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html)
+    or ignore a particular input altogether.
+
+-   `onTimer()` is called by Flink when a previously-registered timer
+    fires. Both event time and processing time timers are supported.
+
+-   `open()` is equivalent to a constructor. It is called inside of the
+    [TaskManager's](https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/glossary.html#flink-taskmanager)
+    JVM, and is used for initialization, such as registering
+    Flink-managed state. It is also the right place to initialize fields
+    that are not serializable and cannot be transferred from the
+    JobManager's JVM.
+
+Most importantly, `ProcessFunction` also has access to the fault-tolerant
+state, handled by Flink. This combination, together with Flink\'s
+message processing and delivery guarantees, makes it possible to build
+resilient event-driven applications with almost arbitrarily
+sophisticated business logic. This includes creation and processing of
+custom windows with state.
+
+### Implementation
+
+#### State and Clean-up
+
+In order to be able to process time windows, we need to keep track of
+data belonging to the window inside of our program. To ensure that this
+data is fault-tolerant and can survive failures in a distributed system,
+we should store it inside of Flink-managed state. As the time
+progresses, we do not need to keep all previous transactions. According
+to the sample rule, all events that are older than 24 hours become
+irrelevant. We are looking at a window of data that constantly moves and
+where stale transactions need to be constantly moved out of scope (in
+other words, cleaned up from state).
+
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/patterns-blog-3/window-clean-up.png" width="400px" alt="Figure 3: Window Clean-up"/>
+<br/>
+<i><small>Figure 3: Window Clean-up</small></i>
+</center>
+<br/>
+
+
+We will
+[use](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state)
+`MapState` to store the individual events of the window. In order to allow
+efficient clean-up of the out-of-scope events, we will utilize event
+timestamps as the `MapState` keys.
+
+In a general case, we have to take into account the fact that there
+might be different events with exactly the same timestamp, therefore
+instead of individual Transaction per key(timestamp) we will store sets.
+
+```java
+MapState<Long, Set<Transaction>> windowState;
+```
+
+<div class="alert alert-info" markdown="1">
+<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Side Note </span>
+when any Flink-managed state is used inside a
+`KeyedProcessFunction`, the data returned by the `state.value()` call is
+automatically scoped by the key of the *currently-processed event*
+- see Figure 4. If `MapState` is used, the same principle applies, with
+the difference that a `Map` is returned instead of `MyObject`. If you are
+compelled to do something like
+`mapState.value().get(inputEvent.getKey())`, you should probably be using
+`ValueState` instead of the `MapState`. As we want to store *multiple values
+per event key*, in our case, `MapState` is the right choice.
+
+<br/>
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/patterns-blog-3/keyed-state-scoping.png" width="800px" alt="Figure 4: Keyed State Scoping"/>
+<br/>
+<i><small>Figure 4: Keyed State Scoping</small></i>
+</center>
+
+</div>
+
+
+As described in the [first blog of the series]({{ site.baseurl }}/news/2020/01/15/demo-fraud-detection.html), we are dispatching events based on the keys
+specified in the active fraud detection rules. Multiple distinct rules
+can be based on the same grouping key. This means that our alerting
+function can potentially receive transactions scoped by the same key
+(e.g. `{payerId=25;beneficiaryId=12}`), but destined to be evaluated
+according to different rules, which implies potentially different
+lengths of the time windows. This raises the question of how can we best
+store fault-tolerant window state within the `KeyedProcessFunction`. One
+approach would be to create and manage separate `MapStates` per rule. Such
+an approach, however, would be wasteful - we would separately hold state
+for overlapping time windows, and therefore unnecessarily store
+duplicate events. A better approach is to always store just enough data
+to be able to estimate all currently active rules which are scoped by
+the same key. In order to achieve that, whenever a new rule is added, we
+will determine if its time window has the largest span and store it in
+the broadcast state under the special reserved `WIDEST_RULE_KEY`. This
+information will later be used during the state clean-up procedure, as
+described later in this section.
+
+```java
+@Override
+public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out){
+  ...
+  updateWidestWindowRule(rule, broadcastState);
+}
+
+private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState){
+  Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);
+
+  if (widestWindowRule == null) {
+    broadcastState.put(WIDEST_RULE_KEY, rule);
+    return;
+  }
+
+  if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {
+    broadcastState.put(WIDEST_RULE_KEY, rule);
+  }
+}
+```
+
+Let's now look at the implementation of the main method,
+`processElement()`, in some detail.
+
+In the [previous blog post]({{ site.baseurl }}/news/2020/01/15/demo-fraud-detection.html#dynamic-data-partitioning), we described how `DynamicKeyFunction` allowed
+us to perform dynamic data partitioning based on the `groupingKeyNames`
+parameter in the rule definition. The subsequent description is focused
+around the `DynamicAlertFunction`, which makes use of the remaining rule
+settings.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/patterns-blog-3/sample-rule-definition.png" width="700px" alt="Figure 5: Sample Rule Definition"/>
+<br/>
+<i><small>Figure 5: Sample Rule Definition</small></i>
+</center>
+<br/>
+
+
+As described in the previous parts of the blog post
+series, our alerting process function receives events of type
+`Keyed<Transaction, String, Integer>`, where `Transaction` is the main
+"wrapped" event, String is the key (*payer \#x - beneficiary \#y* in
+Figure 1), and `Integer` is the ID of the rule that caused the dispatch of
+this event. This rule was previously [stored in the broadcast state]({{ site.baseurl }}/news/2020/03/24/demo-fraud-detection-2.html#broadcast-state-pattern) and has to be retrieved from that state by the ID. Here is the
+outline of the implementation:
+
+```java
+public class DynamicAlertFunction
+    extends KeyedBroadcastProcessFunction<
+        String, Keyed<Transaction, String, Integer>, Rule, Alert> {
+
+  private transient MapState<Long, Set<Transaction>> windowState;
+
+  @Override
+  public void processElement(
+      Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out){
+
+    // Add Transaction to state
+    long currentEventTime = value.getWrapped().getEventTime();                            // <--- (1)
+    addToStateValuesSet(windowState, currentEventTime, value.getWrapped());
+
+    // Calculate the aggregate value
+    Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());    // <--- (2)
+    Long windowStartTimestampForEvent = rule.getWindowStartTimestampFor(currentEventTime);// <--- (3)
+
+    SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);            // <--- (4)
+    for (Long stateEventTime : windowState.keys()) {
+      if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {
+        aggregateValuesInState(stateEventTime, aggregator, rule);
+      }
+    }
+
+    // Evaluate the rule and trigger an alert if violated
+    BigDecimal aggregateResult = aggregator.getLocalValue();                              // <--- (5)
+    boolean isRuleViolated = rule.apply(aggregateResult);
+    if (isRuleViolated) {
+      long decisionTime = System.currentTimeMillis();
+      out.collect(new Alert<>(rule.getRuleId(),
+                              rule,
+                              value.getKey(),
+                              decisionTime,
+                              value.getWrapped(),
+                              aggregateResult));
+    }
+
+    // Register timers to ensure state cleanup
+    long cleanupTime = (currentEventTime / 1000) * 1000;                                  // <--- (6)
+    ctx.timerService().registerEventTimeTimer(cleanupTime);
+  }
+```
+
+<br/>
+Here are the details of the steps:  
+1)  We first add each new event to our window state:
+
+```java
+static <K, V> Set<V> addToStateValuesSet(MapState<K, Set<V>> mapState, K key, V value)
+      throws Exception {
+    Set<V> valuesSet = mapState.get(key);
+    if (valuesSet != null) {
+      valuesSet.add(value);
+    } else {
+      valuesSet = new HashSet<>();
+      valuesSet.add(value);
+    }
+    mapState.put(key, valuesSet);
+    return valuesSet;
+}
+```
+
+2) Next, we retrieve the previously-broadcasted rule, according to
+    which the incoming transaction needs to be evaluated.
+
+3) `getWindowStartTimestampFor` determines, given the window span defined
+    in the rule, and the current transaction timestamp, how far back in
+    time our evaluation should span.
+
+4) The aggregate value is calculated by iterating over all window state
+    entries and applying an aggregate function. It could be an *average,
+    max, min* or, as in the example rule from the beginning of this
+    section, a *sum*.
+
+```java
+private boolean isStateValueInWindow(
+    Long stateEventTime, Long windowStartForEvent, long currentEventTime) {
+  return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;
+}
+
+private void aggregateValuesInState(
+    Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {
+  Set<Transaction> inWindow = windowState.get(stateEventTime);
+  for (Transaction event : inWindow) {
+    BigDecimal aggregatedValue =
+        FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);
+    aggregator.add(aggregatedValue);
+  }
+}
+```
+
+5) Having an aggregate value, we can compare it to the threshold value
+    that is specified in the rule definition and fire an alert, if
+    necessary.
+
+6) At the end, we register a clean-up timer using
+`ctx.timerService().registerEventTimeTimer()`. This timer will be
+    responsible for removing the current transaction when it is going to
+    move out of scope.
+
+<div class="alert alert-info" markdown="1">
+<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span>  Note </span>
+Notice the rounding during timer creation. It is an important technique
+which enables a reasonable trade-off between the precision with which
+the timers will be triggered, and the number of timers being used.
+Timers are stored in Flink's fault-tolerant state, and managing them
+with millisecond-level precision can be wasteful. In our case, with this
+rounding, we will create at most one timer per key in any given second. Flink documentation provides some additional [<u>details</u>](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#timer-coalescing).
+</div>
+
+7) The `onTimer` method will trigger the clean-up of the window state.
+
+As previously described, we are always keeping as many events in the
+state as required for the evaluation of an active rule with the widest
+window span. This means that during the clean-up, we only need to remove
+the state which is out of scope of this widest window.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/patterns-blog-3/widest-window.png" width="800px" alt="Figure 6: Widest Window"/>
+<br/>
+<i><small>Figure 6: Widest Window</small></i>
+</center>
+<br/>
+
+This is how the clean-up procedure can be implemented:
+
+```java
+@Override
+public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)
+    throws Exception {
+
+  Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);
+
+  Optional<Long> cleanupEventTimeWindow =
+      Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);
+  Optional<Long> cleanupEventTimeThreshold =
+      cleanupEventTimeWindow.map(window -> timestamp - window);
+  // Remove events that are older than (timestamp - widestWindowSpan)ms
+  cleanupEventTimeThreshold.ifPresent(this::evictOutOfScopeElementsFromWindow);
+}
+
+private void evictOutOfScopeElementsFromWindow(Long threshold) {
+  try {
+    Iterator<Long> keys = windowState.keys().iterator();
+    while (keys.hasNext()) {
+      Long stateEventTime = keys.next();
+      if (stateEventTime < threshold) {
+        keys.remove();
+      }
+    }
+  } catch (Exception ex) {
+    throw new RuntimeException(ex);
+  }
+}
+```
+
+<div class="alert alert-info" markdown="1">
+<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span>  Note</span>
+You might be wondering why we did not use `ListState` , as we are always
+iterating over all of the values of the window state? This is actually
+an optimization for the case when `RocksDBStateBackend`
+[is used](https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#the-rocksdbstatebackend). Iterating over a `ListState` would cause all of the `Transaction`
+objects to be deserialized. Using `MapState`\'s keys iterator only causes
+deserialization of the keys (type `long`), and therefore reduces the
+computational overhead.
+</div>
+
+This concludes the description of the implementation details. Our
+approach triggers evaluation of a time window as soon as a new
+transaction arrives. It therefore fulfills the main requirement that we
+have targeted - low delay for potentially issuing an alert. For the
+complete implementation, please have a look at
+[the project on github](https://github.com/afedulov/fraud-detection-demo).
+
+## Improvements and Optimizations
+
+What are the pros and cons of the described approach?
+
+**Pros:**
+
+-   Low latency capabilities
+
+-   Tailored solution with potential use-case specific optimizations
+
+-   Efficient state reuse (shared state for the rules with the same key)
+
+**Cons:**
+
+-   Cannot make use of potential future optimizations in the existing
+    Window API
+
+-   No late event handling, which is available out of the box in the
+    Window API
+
+-   Quadratic computation complexity and potentially large state
+
+Let's now look at the latter two drawbacks and see if we can address
+them.
+
+#### Late events:
+
+Processing late events poses a certain question - is it still meaningful
+to re-evaluate the window in case of a late event arrival? In case this
+is required, you would need to extend the widest window used for the
+clean-up by your maximum expected out-of-orderness. This would avoid
+having potentially incomplete time window data for such late firings
+(see Figure 7).
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/patterns-blog-3/late-events.png" width="500px" alt="Figure 7: Late Events Handling"/>
+<br/>
+<i><small>Figure 7: Late Events Handling</small></i>
+</center>
+<br/>
+
+It can be argued, however, that for a use case that puts emphasis on low
+latency processing, such late triggering would be meaningless. In this
+case, we could keep track of the most recent timestamp that we have
+observed so far, and for events that do not monotonically increase this
+value, only add them to the state and skip the aggregate calculation and
+the alert triggering logic.
+
+#### Redundant Re-computations and State Size:
+
+In our described implementation we keep individual transactions in state
+and go over them to calculate the aggregate again and again on every new
+event. This is obviously not optimal in terms of wasting computational
+resources on repeated calculations.
+
+What is the main reason to keep the individual transactions in state?
+The granularity of stored events directly corresponds to the precision
+of the time window calculation. Because we store transactions
+individually, we can precisely ignore individual transactions as soon as
+they leave the exact 2592000000 ms time window (30 days in ms). At this
+point, it is worth raising the question - do we really need this
+milliseconds precision when estimating such a long time window, or is it
+OK to accept potential false positives in exceptional cases? If the
+answer for your use case is that such precision is not needed, you could
+implement additional optimization based on bucketing and
+pre-aggregation. The idea of this optimization can be broken down as
+follows:
+
+-   Instead of storing individual events, create a parent class that can
+    either contain fields of a single transaction, or combined values,
+    calculated based on applying an aggregate function to a set of
+    transactions.
+
+-   Instead of using timestamps in milliseconds as `MapState` keys, round
+    them to the level of "resolution" that you are willing to accept
+    (for instance, a full minute). Each entry therefore represents a
+    bucket.
+
+-   Whenever a window is evaluated, append the new transaction's data to
+    the bucket aggregate instead of storing individual data points per
+    transaction.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/patterns-blog-3/pre-aggregation.png" width="700px" alt="Figure 8: Pre-aggregation"/>
+<br/>
+<i><small>Figure 8: Pre-aggregation</small></i>
+</center>
+<br/>
+
+#### State Data and Serializers
+
+Another question that we can ask ourselves in order to further optimize
+the implementation is how probable is it to get different events with
+exactly the same timestamp. In the described implementation, we
+demonstrated one way of approaching this question by storing sets of
+transactions per timestamp in `MapState<Long, Set<Transaction>>`. Such
+a choice, however, might have a more significant effect on performance
+than might be anticipated. The reason is that Flink does not currently
+provide a native `Set` serializer and will enforce a fallback to the less
+efficient [Kryo
+serializer](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/types_serialization.html#general-class-types)
+instead
+([FLINK-16729](https://issues.apache.org/jira/browse/FLINK-16729)). A
+meaningful alternative strategy is to assume that, in a normal scenario,
+no two discrepant events can have exactly the same timestamp and to turn
+the window state into a `MapState<Long, Transaction>` type. You can use
+[side-outputs](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html)
+to collect and monitor any unexpected occurrences which contradict your
+assumption. During performance optimizations, I generally recommend you
+to [disable the fallback to
+Kryo](https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#disabling-kryo)
+and verify where your application might be further optimized by ensuring
+that [more efficient
+serializers](https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#performance-comparison)
+are being used.
+
+<div class="alert alert-info" markdown="1">
+<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span>  Tip:</span>
+you can quickly determine which serializer is going to be
+used for your classes by setting a breakpoint and verifying the type of
+the returned TypeInformation.
+<br/>
+
+<center>
+<table class="tg">
+  <tr>
+    <td class="tg-topcenter">
+      <img src="{{ site.baseurl }}/img/blog/patterns-blog-3/type-pojo.png" alt="POJO"/></td>
+    <td class="tg-topcenter">
+      <i>PojoTypeInfo</i> indicates that that an efficient Flink POJO serializer will be used.</td>
+  </tr>
+  <tr>
+    <td class="tg-top">
+      <img src="{{ site.baseurl }}/img/blog/patterns-blog-3/type-kryo.png" alt="Kryo"/></td>
+    <td class="tg-topcenter">
+      <i>GenericTypeInfo</i> indicates the fallback to a Kryo serializer.</td>
+  </tr>
+</table>
+</center>
+</div>
+
+
+
+
+
+**Event pruning**: instead of storing complete events and putting
+additional stress on the ser/de machinery, we can reduce individual
+events data to only relevant information. This would potentially require
+"unpacking" individual events as fields, and storing those fields into a
+generic `Map<String, Object>` data structure, based on the
+configurations of active rules.
+
+While this adjustment could potentially produce significant improvements
+for objects of large size, it should not be your first pick as it can
+easily turn into a premature optimization.
+
+## Summary:
+
+This article concludes the description of the implementation of the
+fraud detection engine that we started in [part one]({{ site.baseurl }}/news/2020/01/15/demo-fraud-detection.html). In this blog
+post we demonstrated how `ProcessFunction` can be utilized to
+\"impersonate\" a window with a sophisticated custom logic. We have
+discussed the pros and cons of such approach and elaborated how custom
+use-case-specific optimizations can be applied - something that would
+not be directly possible with the Window API.
+
+The goal of this blog post was to illustrate the power and flexibility
+of Apache Flink's APIs. At the core of it are the pillars of Flink, that
+spare you, as a developer, very significant amounts of work and
+generalize well to a wide range of use cases by providing:
+
+-   Efficient data exchange in a distributed cluster
+
+-   Horizontal scalability via data partitioning
+
+-   Fault-tolerant state with quick, local access
+
+-   Convenient abstraction for working with this state, which is as simple as using a
+    local variable
+
+-   Multi-threaded, parallel execution engine. `ProcessFunction` code runs
+    in a single thread, without the need for synchronization. Flink
+    handles all the parallel execution aspects and correct access to the
+    shared state, without you, as a developer, having to think about it
+    (concurrency is hard).
+
+All these aspects make it possible to build applications with Flink that
+go well beyond trivial streaming ETL use cases and enable implementation
+of arbitrarily-sophisticated, distributed event-driven applications.
+With Flink, you can rethink approaches to a wide range of use cases
+which normally would rely on using stateless parallel execution nodes
+and "pushing" the concerns of state fault tolerance to a database, an
+approach that is often destined to run into scalability issues in the
+face of ever-increasing data volumes.
diff --git a/img/blog/patterns-blog-3/evaluation-delays.png b/img/blog/patterns-blog-3/evaluation-delays.png
new file mode 100644
index 0000000..2edf942
Binary files /dev/null and b/img/blog/patterns-blog-3/evaluation-delays.png differ
diff --git a/img/blog/patterns-blog-3/keyed-state-scoping.png b/img/blog/patterns-blog-3/keyed-state-scoping.png
new file mode 100644
index 0000000..fa6fcc3
Binary files /dev/null and b/img/blog/patterns-blog-3/keyed-state-scoping.png differ
diff --git a/img/blog/patterns-blog-3/late-events.png b/img/blog/patterns-blog-3/late-events.png
new file mode 100644
index 0000000..5bdfad1
Binary files /dev/null and b/img/blog/patterns-blog-3/late-events.png differ
diff --git a/img/blog/patterns-blog-3/pre-aggregation.png b/img/blog/patterns-blog-3/pre-aggregation.png
new file mode 100644
index 0000000..8005dc5
Binary files /dev/null and b/img/blog/patterns-blog-3/pre-aggregation.png differ
diff --git a/img/blog/patterns-blog-3/sample-rule-definition.png b/img/blog/patterns-blog-3/sample-rule-definition.png
new file mode 100644
index 0000000..1a625c9
Binary files /dev/null and b/img/blog/patterns-blog-3/sample-rule-definition.png differ
diff --git a/img/blog/patterns-blog-3/time-windows.png b/img/blog/patterns-blog-3/time-windows.png
new file mode 100644
index 0000000..5649e27
Binary files /dev/null and b/img/blog/patterns-blog-3/time-windows.png differ
diff --git a/img/blog/patterns-blog-3/type-kryo.png b/img/blog/patterns-blog-3/type-kryo.png
new file mode 100644
index 0000000..eaf7153
Binary files /dev/null and b/img/blog/patterns-blog-3/type-kryo.png differ
diff --git a/img/blog/patterns-blog-3/type-pojo.png b/img/blog/patterns-blog-3/type-pojo.png
new file mode 100644
index 0000000..1f8cef0
Binary files /dev/null and b/img/blog/patterns-blog-3/type-pojo.png differ
diff --git a/img/blog/patterns-blog-3/widest-window.png b/img/blog/patterns-blog-3/widest-window.png
new file mode 100644
index 0000000..0376d64
Binary files /dev/null and b/img/blog/patterns-blog-3/widest-window.png differ
diff --git a/img/blog/patterns-blog-3/window-clean-up.png b/img/blog/patterns-blog-3/window-clean-up.png
new file mode 100644
index 0000000..32b2bdc
Binary files /dev/null and b/img/blog/patterns-blog-3/window-clean-up.png differ


[flink-web] 02/02: Rebuild website

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 008e9070b888560f289008339fecdbf596242849
Author: Nico Kruber <ni...@gmail.com>
AuthorDate: Tue Jul 28 16:56:48 2020 +0200

    Rebuild website
---
 content/blog/feed.xml                              | 1115 ++++++++++++++++----
 content/blog/index.html                            |   38 +-
 content/blog/page10/index.html                     |   38 +-
 content/blog/page11/index.html                     |   44 +-
 content/blog/page12/index.html                     |   45 +-
 content/blog/page13/index.html                     |   25 +
 content/blog/page2/index.html                      |   40 +-
 content/blog/page3/index.html                      |   38 +-
 content/blog/page4/index.html                      |   36 +-
 content/blog/page5/index.html                      |   38 +-
 content/blog/page6/index.html                      |   40 +-
 content/blog/page7/index.html                      |   38 +-
 content/blog/page8/index.html                      |   36 +-
 content/blog/page9/index.html                      |   37 +-
 .../img/blog/patterns-blog-3/evaluation-delays.png |  Bin 0 -> 29120 bytes
 .../blog/patterns-blog-3/keyed-state-scoping.png   |  Bin 0 -> 199113 bytes
 content/img/blog/patterns-blog-3/late-events.png   |  Bin 0 -> 20483 bytes
 .../img/blog/patterns-blog-3/pre-aggregation.png   |  Bin 0 -> 33817 bytes
 .../patterns-blog-3/sample-rule-definition.png     |  Bin 0 -> 98413 bytes
 content/img/blog/patterns-blog-3/time-windows.png  |  Bin 0 -> 37632 bytes
 content/img/blog/patterns-blog-3/type-kryo.png     |  Bin 0 -> 28294 bytes
 content/img/blog/patterns-blog-3/type-pojo.png     |  Bin 0 -> 34853 bytes
 content/img/blog/patterns-blog-3/widest-window.png |  Bin 0 -> 90233 bytes
 .../img/blog/patterns-blog-3/window-clean-up.png   |  Bin 0 -> 15498 bytes
 content/index.html                                 |   13 +-
 .../news/2020/07/30/demo-fraud-detection-3.html    |  908 ++++++++++++++++
 content/zh/index.html                              |   13 +-
 27 files changed, 2131 insertions(+), 411 deletions(-)

diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index 4f96e80..3bf09d8 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -7,6 +7,671 @@
 <atom:link href="https://flink.apache.org/blog/feed.xml" rel="self" type="application/rss+xml" />
 
 <item>
+<title>Advanced Flink Application Patterns Vol.3: Custom Window Processing</title>
+<description>&lt;style type=&quot;text/css&quot;&gt;
+.tg  {border-collapse:collapse;border-spacing:0;}
+.tg td{padding:10px 10px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;}
+.tg th{padding:10px 10px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;background-color:#eff0f1;}
+.tg .tg-wide{padding:10px 30px;}
+.tg .tg-top{vertical-align:top}
+.tg .tg-topcenter{text-align:center;vertical-align:top}
+.tg .tg-center{text-align:center;vertical-align:center}
+&lt;/style&gt;
+
+&lt;h2 id=&quot;introduction&quot;&gt;Introduction&lt;/h2&gt;
+
+&lt;p&gt;In the previous articles of the series, we described how you can achieve
+flexible stream partitioning based on dynamically-updated configurations
+(a set of fraud-detection rules) and how you can utilize Flink&#39;s
+Broadcast mechanism to distribute processing configuration at runtime
+among the relevant operators. &lt;/p&gt;
+
+&lt;p&gt;Following up directly where we left the discussion of the end-to-end
+solution last time, in this article we will describe how you can use the
+&quot;Swiss knife&quot; of Flink - the &lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html&quot;&gt;&lt;em&gt;Process Function&lt;/em&gt;&lt;/a&gt; to create an
+implementation that is tailor-made to match your streaming business
+logic requirements. Our discussion will continue in the context of the
+&lt;a href=&quot;/news/2020/01/15/demo-fraud-detection.html#fraud-detection-demo&quot;&gt;Fraud Detection engine&lt;/a&gt;. We will also demonstrate how you can
+implement your own &lt;strong&gt;custom replacement for time windows&lt;/strong&gt; for cases
+where the out-of-the-box windowing available from the DataStream API
+does not satisfy your requirements. In particular, we will look at the
+trade-offs that you can make when designing a solution which requires
+low-latency reactions to individual events.&lt;/p&gt;
+
+&lt;p&gt;This article will describe some high-level concepts that can be applied
+independently, but it is recommended that you review the material in
+&lt;a href=&quot;/news/2020/01/15/demo-fraud-detection.html&quot;&gt;part one&lt;/a&gt; and
+&lt;a href=&quot;/news/2020/03/24/demo-fraud-detection-2.html&quot;&gt;part two&lt;/a&gt; of the series as well as checkout the &lt;a href=&quot;https://github.com/afedulov/fraud-detection-demo&quot;&gt;code
+base&lt;/a&gt; in order to make
+it easier to follow along.&lt;/p&gt;
+
+&lt;h2 id=&quot;processfunction-as-a-window&quot;&gt;ProcessFunction as a “Window”&lt;/h2&gt;
+
+&lt;h3 id=&quot;low-latency&quot;&gt;Low Latency&lt;/h3&gt;
+
+&lt;p&gt;Let’s start with a reminder of the type of fraud detection rule that we
+would like to support:&lt;/p&gt;
+
+&lt;p&gt;&lt;em&gt;“Whenever the &lt;strong&gt;sum&lt;/strong&gt; of  &lt;strong&gt;payments&lt;/strong&gt; from the same &lt;strong&gt;payer&lt;/strong&gt; to the
+same &lt;strong&gt;beneficiary&lt;/strong&gt; within &lt;strong&gt;a 24 hour
+period&lt;/strong&gt; is &lt;strong&gt;greater&lt;/strong&gt; than &lt;strong&gt;200 000 $&lt;/strong&gt; - trigger an alert.”&lt;/em&gt;&lt;/p&gt;
+
+&lt;p&gt;In other words, given a stream of transactions partitioned by a key that
+combines the payer and the beneficiary fields, we would like to look
+back in time and determine, for each incoming transaction, if the sum of
+all previous payments between the two specific participants exceeds the
+defined threshold. In effect, the computation window is always moved
+along to the position of the last observed event for a particular data
+partitioning key.&lt;/p&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/patterns-blog-3/time-windows.png&quot; width=&quot;600px&quot; alt=&quot;Figure 1: Time Windows&quot; /&gt;
+&lt;br /&gt;
+&lt;i&gt;&lt;small&gt;Figure 1: Time Windows&lt;/small&gt;&lt;/i&gt;
+&lt;/center&gt;
+&lt;p&gt;&lt;br /&gt;&lt;/p&gt;
+
+&lt;p&gt;One of the common key requirements for a fraud detection system is &lt;em&gt;low
+response time&lt;/em&gt;. The sooner the fraudulent action gets detected, the
+higher the chances that it can be blocked and its negative consequences
+mitigated. This requirement is especially prominent in the financial
+domain, where you have one important constraint - any time spent
+evaluating a fraud detection model is time that a law-abiding user of
+your system will spend waiting for a response. Swiftness of processing
+often becomes a competitive advantage between various payment systems
+and the time limit for producing an alert could lie as low as &lt;em&gt;300-500
+ms&lt;/em&gt;. This is all the time you get from the moment of ingestion of a
+transaction event into a fraud detection system until an alert has to
+become available to downstream systems. &lt;/p&gt;
+
+&lt;p&gt;As you might know, Flink provides a powerful &lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html&quot;&gt;Window
+API&lt;/a&gt;
+that is applicable for a wide range of use cases. However, if you go
+over all of the available types of supported windows, you will realize
+that none of them exactly match our main requirement for this use case -
+the low-latency evaluation of &lt;em&gt;each&lt;/em&gt; incoming transaction. There is
+no type of window in Flink that can express the &lt;em&gt;“x minutes/hours/days
+back from the &lt;u&gt;current event&lt;/u&gt;”&lt;/em&gt; semantic. In the Window API, events
+fall into windows (as defined by the window
+&lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#window-assigners&quot;&gt;assigners&lt;/a&gt;),
+but they cannot themselves individually control the creation and
+evaluation of windows*. As described above, our goal for the fraud
+detection engine is to achieve immediate evaluation of the previous
+relevant data points as soon as the new event is received. This raises
+the question of feasibility of applying the Window API in this case. The Window API offers some options for defining custom triggers, evictors, and window assigners, which may get to the required result. However, it is usually difficult to get this right (and easy to break). Moreover, this approach does not provide access to broadcast state, which is required for implementing dynamic reconfiguration of business rules.&lt;/p&gt;
+
+&lt;p&gt;*) apart from the session windows, but they are limited to assignments
+based on the session &lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#session-windows&quot;&gt;gaps&lt;/a&gt;&lt;/p&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/patterns-blog-3/evaluation-delays.png&quot; width=&quot;600px&quot; alt=&quot;Figure 2: Evaluation Delays&quot; /&gt;
+&lt;br /&gt;
+&lt;i&gt;&lt;small&gt;Figure 2: Evaluation Delays&lt;/small&gt;&lt;/i&gt;
+&lt;/center&gt;
+&lt;p&gt;&lt;br /&gt;&lt;/p&gt;
+
+&lt;p&gt;Let’s take an example of using a &lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#sliding-windows&quot;&gt;sliding
+window&lt;/a&gt;
+from Flink’s Window API. Using sliding windows with the slide of &lt;em&gt;S&lt;/em&gt;
+translates into an expected value of evaluation delay equal to &lt;em&gt;S/2.&lt;/em&gt;
+This means that you would need to define a window slide of 600-1000 ms
+to fulfill the low-latency requirement of 300-500 ms delay, even before
+taking any actual computation time into account. The fact that Flink
+stores a separate window state for each sliding window pane renders this
+approach unfeasible under any moderately high load conditions.&lt;/p&gt;
+
+&lt;p&gt;In order to satisfy the requirements, we need to create our own
+low-latency window implementation. Luckily, Flink gives us all the tools
+required to do so. &lt;code&gt;ProcessFunction&lt;/code&gt; is a low-level, but powerful
+building block in Flink&#39;s API. It has a simple contract:&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-java&quot;&gt;&lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kd&quot;&gt;class&lt;/span&gt; &lt;span class=&quot;nc&quot;&gt;SomeProcessFunction&lt;/span&gt; &lt;span class=&quot;kd&quot;&gt;extends&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;KeyedProcessFunction&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;KeyType&lt;/span&gt;&lt [...]
+
+	&lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;processElement&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;InputType&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;event&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Context&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;ctx&lt;/span&gt;&lt;span class=&quot;o&quot; [...]
+
+	&lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;onTimer&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;timestamp&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;OnTimerContext&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;ctx&lt;/span&gt;&lt;span class=&quot;o&quot; [...]
+
+	&lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;open&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Configuration&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;parameters&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;){}&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;ul&gt;
+  &lt;li&gt;
+    &lt;p&gt;&lt;code&gt;processElement()&lt;/code&gt; receives input events one by one. You can react to
+each input by producing one or more output events to the next
+operator by calling &lt;code&gt;out.collect(someOutput)&lt;/code&gt;. You can also pass data
+to a &lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html&quot;&gt;side
+output&lt;/a&gt;
+or ignore a particular input altogether.&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;&lt;code&gt;onTimer()&lt;/code&gt; is called by Flink when a previously-registered timer
+fires. Both event time and processing time timers are supported.&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;&lt;code&gt;open()&lt;/code&gt; is equivalent to a constructor. It is called inside of the
+&lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/glossary.html#flink-taskmanager&quot;&gt;TaskManager’s&lt;/a&gt;
+JVM, and is used for initialization, such as registering
+Flink-managed state. It is also the right place to initialize fields
+that are not serializable and cannot be transferred from the
+JobManager’s JVM.&lt;/p&gt;
+  &lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;Most importantly, &lt;code&gt;ProcessFunction&lt;/code&gt; also has access to the fault-tolerant
+state, handled by Flink. This combination, together with Flink&#39;s
+message processing and delivery guarantees, makes it possible to build
+resilient event-driven applications with almost arbitrarily
+sophisticated business logic. This includes creation and processing of
+custom windows with state.&lt;/p&gt;
+
+&lt;h3 id=&quot;implementation&quot;&gt;Implementation&lt;/h3&gt;
+
+&lt;h4 id=&quot;state-and-clean-up&quot;&gt;State and Clean-up&lt;/h4&gt;
+
+&lt;p&gt;In order to be able to process time windows, we need to keep track of
+data belonging to the window inside of our program. To ensure that this
+data is fault-tolerant and can survive failures in a distributed system,
+we should store it inside of Flink-managed state. As the time
+progresses, we do not need to keep all previous transactions. According
+to the sample rule, all events that are older than 24 hours become
+irrelevant. We are looking at a window of data that constantly moves and
+where stale transactions need to be constantly moved out of scope (in
+other words, cleaned up from state).&lt;/p&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/patterns-blog-3/window-clean-up.png&quot; width=&quot;400px&quot; alt=&quot;Figure 3: Window Clean-up&quot; /&gt;
+&lt;br /&gt;
+&lt;i&gt;&lt;small&gt;Figure 3: Window Clean-up&lt;/small&gt;&lt;/i&gt;
+&lt;/center&gt;
+&lt;p&gt;&lt;br /&gt;&lt;/p&gt;
+
+&lt;p&gt;We will
+&lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state&quot;&gt;use&lt;/a&gt;
+&lt;code&gt;MapState&lt;/code&gt; to store the individual events of the window. In order to allow
+efficient clean-up of the out-of-scope events, we will utilize event
+timestamps as the &lt;code&gt;MapState&lt;/code&gt; keys.&lt;/p&gt;
+
+&lt;p&gt;In a general case, we have to take into account the fact that there
+might be different events with exactly the same timestamp, therefore
+instead of individual Transaction per key(timestamp) we will store sets.&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-java&quot;&gt;&lt;span class=&quot;n&quot;&gt;MapState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Set&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Transaction&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt [...]
+
+&lt;div class=&quot;alert alert-info&quot;&gt;
+  &lt;p&gt;&lt;span class=&quot;label label-info&quot; style=&quot;display: inline-block&quot;&gt;&lt;span class=&quot;glyphicon glyphicon-info-sign&quot; aria-hidden=&quot;true&quot;&gt;&lt;/span&gt; Side Note &lt;/span&gt;
+when any Flink-managed state is used inside a
+&lt;code&gt;KeyedProcessFunction&lt;/code&gt;, the data returned by the &lt;code&gt;state.value()&lt;/code&gt; call is
+automatically scoped by the key of the &lt;em&gt;currently-processed event&lt;/em&gt;
+- see Figure 4. If &lt;code&gt;MapState&lt;/code&gt; is used, the same principle applies, with
+the difference that a &lt;code&gt;Map&lt;/code&gt; is returned instead of &lt;code&gt;MyObject&lt;/code&gt;. If you are
+compelled to do something like
+&lt;code&gt;mapState.value().get(inputEvent.getKey())&lt;/code&gt;, you should probably be using
+&lt;code&gt;ValueState&lt;/code&gt; instead of the &lt;code&gt;MapState&lt;/code&gt;. As we want to store &lt;em&gt;multiple values
+per event key&lt;/em&gt;, in our case, &lt;code&gt;MapState&lt;/code&gt; is the right choice.&lt;/p&gt;
+
+  &lt;p&gt;&lt;br /&gt;&lt;/p&gt;
+
+  &lt;center&gt;
+&lt;img src=&quot;/img/blog/patterns-blog-3/keyed-state-scoping.png&quot; width=&quot;800px&quot; alt=&quot;Figure 4: Keyed State Scoping&quot; /&gt;
+&lt;br /&gt;
+&lt;i&gt;&lt;small&gt;Figure 4: Keyed State Scoping&lt;/small&gt;&lt;/i&gt;
+&lt;/center&gt;
+
+&lt;/div&gt;
+
+&lt;p&gt;As described in the &lt;a href=&quot;/news/2020/01/15/demo-fraud-detection.html&quot;&gt;first blog of the series&lt;/a&gt;, we are dispatching events based on the keys
+specified in the active fraud detection rules. Multiple distinct rules
+can be based on the same grouping key. This means that our alerting
+function can potentially receive transactions scoped by the same key
+(e.g. &lt;code&gt;{payerId=25;beneficiaryId=12}&lt;/code&gt;), but destined to be evaluated
+according to different rules, which implies potentially different
+lengths of the time windows. This raises the question of how can we best
+store fault-tolerant window state within the &lt;code&gt;KeyedProcessFunction&lt;/code&gt;. One
+approach would be to create and manage separate &lt;code&gt;MapStates&lt;/code&gt; per rule. Such
+an approach, however, would be wasteful - we would separately hold state
+for overlapping time windows, and therefore unnecessarily store
+duplicate events. A better approach is to always store just enough data
+to be able to estimate all currently active rules which are scoped by
+the same key. In order to achieve that, whenever a new rule is added, we
+will determine if its time window has the largest span and store it in
+the broadcast state under the special reserved &lt;code&gt;WIDEST_RULE_KEY&lt;/code&gt;. This
+information will later be used during the state clean-up procedure, as
+described later in this section.&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-java&quot;&gt;&lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
+&lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;processBroadcastElement&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Rule&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Context&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;ctx&lt;/span&gt;&lt;span class=&quot;o&quo [...]
+  &lt;span class=&quot;o&quot;&gt;...&lt;/span&gt;
+  &lt;span class=&quot;n&quot;&gt;updateWidestWindowRule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;broadcastState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+
+&lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;updateWidestWindowRule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Rule&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;BroadcastState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class= [...]
+  &lt;span class=&quot;n&quot;&gt;Rule&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;widestWindowRule&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;broadcastState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;WIDEST_RULE_KEY&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+
+  &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;widestWindowRule&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;==&lt;/span&gt; &lt;span class=&quot;kc&quot;&gt;null&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;broadcastState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;put&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;WIDEST_RULE_KEY&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+    &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;;&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+
+  &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;widestWindowRule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getWindowMillis&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&g [...]
+    &lt;span class=&quot;n&quot;&gt;broadcastState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;put&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;WIDEST_RULE_KEY&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;Let’s now look at the implementation of the main method,
+&lt;code&gt;processElement()&lt;/code&gt;, in some detail.&lt;/p&gt;
+
+&lt;p&gt;In the &lt;a href=&quot;/news/2020/01/15/demo-fraud-detection.html#dynamic-data-partitioning&quot;&gt;previous blog post&lt;/a&gt;, we described how &lt;code&gt;DynamicKeyFunction&lt;/code&gt; allowed
+us to perform dynamic data partitioning based on the &lt;code&gt;groupingKeyNames&lt;/code&gt;
+parameter in the rule definition. The subsequent description is focused
+around the &lt;code&gt;DynamicAlertFunction&lt;/code&gt;, which makes use of the remaining rule
+settings.&lt;/p&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/patterns-blog-3/sample-rule-definition.png&quot; width=&quot;700px&quot; alt=&quot;Figure 5: Sample Rule Definition&quot; /&gt;
+&lt;br /&gt;
+&lt;i&gt;&lt;small&gt;Figure 5: Sample Rule Definition&lt;/small&gt;&lt;/i&gt;
+&lt;/center&gt;
+&lt;p&gt;&lt;br /&gt;&lt;/p&gt;
+
+&lt;p&gt;As described in the previous parts of the blog post
+series, our alerting process function receives events of type
+&lt;code&gt;Keyed&amp;lt;Transaction, String, Integer&amp;gt;&lt;/code&gt;, where &lt;code&gt;Transaction&lt;/code&gt; is the main
+“wrapped” event, String is the key (&lt;em&gt;payer #x - beneficiary #y&lt;/em&gt; in
+Figure 1), and &lt;code&gt;Integer&lt;/code&gt; is the ID of the rule that caused the dispatch of
+this event. This rule was previously &lt;a href=&quot;/news/2020/03/24/demo-fraud-detection-2.html#broadcast-state-pattern&quot;&gt;stored in the broadcast state&lt;/a&gt; and has to be retrieved from that state by the ID. Here is the
+outline of the implementation:&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-java&quot;&gt;&lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kd&quot;&gt;class&lt;/span&gt; &lt;span class=&quot;nc&quot;&gt;DynamicAlertFunction&lt;/span&gt;
+    &lt;span class=&quot;kd&quot;&gt;extends&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;KeyedBroadcastProcessFunction&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Keyed&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Transaction&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span class=&quot;o&quot;&g [...]
+
+  &lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span class=&quot;kd&quot;&gt;transient&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;MapState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Set&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Transaction&lt;/span&gt;&lt;span class=&quot; [...]
+
+  &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
+  &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;processElement&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;
+      &lt;span class=&quot;n&quot;&gt;Keyed&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Transaction&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span class=&quot;o&quo [...]
+
+    &lt;span class=&quot;c1&quot;&gt;// Add Transaction to state&lt;/span&gt;
+    &lt;span class=&quot;kt&quot;&gt;long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;currentEventTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getWrapped&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getEventTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;();&lt;/span&gt;                       [...]
+    &lt;span class=&quot;n&quot;&gt;addToStateValuesSet&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;windowState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;currentEventTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getWrapped&lt;/span&gt;&lt;span class= [...]
+
+    &lt;span class=&quot;c1&quot;&gt;// Calculate the aggregate value&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;Rule&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;ctx&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getBroadcastState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Descriptors&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;rul [...]
+    &lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowStartTimestampForEvent&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getWindowStartTimestampFor&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;currentEventTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span [...]
+
+    &lt;span class=&quot;n&quot;&gt;SimpleAccumulator&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;BigDecimal&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;aggregator&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;RuleHelper&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getAggregator&lt;/span&gt;& [...]
+    &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;keys&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;())&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&l [...]
+      &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;isStateValueInWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowStartForEvent&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;currentEventTime&lt;/span&gt;&lt [...]
+        &lt;span class=&quot;n&quot;&gt;aggregateValuesInState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;aggregator&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+      &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+
+    &lt;span class=&quot;c1&quot;&gt;// Evaluate the rule and trigger an alert if violated&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;BigDecimal&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;aggregateResult&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;aggregator&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getLocalValue&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;();&lt;/span&gt;                              &lt;span class=&quot;c1&quot;&gt;// &amp;lt;--- (5)&lt;/span&gt;
+    &lt;span class=&quot;kt&quot;&gt;boolean&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;isRuleViolated&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;apply&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;aggregateResult&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+    &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;isRuleViolated&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+      &lt;span class=&quot;kt&quot;&gt;long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;decisionTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;System&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;currentTimeMillis&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;();&lt;/span&gt;
+      &lt;span class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Alert&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getR [...]
+                              &lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt;
+                              &lt;span class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getKey&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(),&lt;/span&gt;
+                              &lt;span class=&quot;n&quot;&gt;decisionTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt;
+                              &lt;span class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getWrapped&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(),&lt;/span&gt;
+                              &lt;span class=&quot;n&quot;&gt;aggregateResult&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+
+    &lt;span class=&quot;c1&quot;&gt;// Register timers to ensure state cleanup&lt;/span&gt;
+    &lt;span class=&quot;kt&quot;&gt;long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;cleanupTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;currentEventTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;/&lt;/span&gt; &lt;span class=&quot;mi&quot;&gt;1000&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;*&lt;/span&gt; &lt;span class=&quot;mi&quot;&gt;1 [...]
+    &lt;span class=&quot;n&quot;&gt;ctx&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;timerService&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;registerEventTimeTimer&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;cleanupTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;&lt;br /&gt;
+Here are the details of the steps:&lt;br /&gt;
+1)  We first add each new event to our window state:&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-java&quot;&gt;&lt;span class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;K&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;V&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Set&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&g [...]
+      &lt;span class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;Set&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;V&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;valuesSet&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;mapState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt; [...]
+    &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;valuesSet&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;!=&lt;/span&gt; &lt;span class=&quot;kc&quot;&gt;null&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+      &lt;span class=&quot;n&quot;&gt;valuesSet&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;add&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;else&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+      &lt;span class=&quot;n&quot;&gt;valuesSet&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;HashSet&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
+      &lt;span class=&quot;n&quot;&gt;valuesSet&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;add&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;mapState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;put&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;key&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;valuesSet&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+    &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;valuesSet&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;;&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;2) Next, we retrieve the previously-broadcasted rule, according to
+    which the incoming transaction needs to be evaluated.&lt;/p&gt;
+
+&lt;p&gt;3) &lt;code&gt;getWindowStartTimestampFor&lt;/code&gt; determines, given the window span defined
+    in the rule, and the current transaction timestamp, how far back in
+    time our evaluation should span.&lt;/p&gt;
+
+&lt;p&gt;4) The aggregate value is calculated by iterating over all window state
+    entries and applying an aggregate function. It could be an &lt;em&gt;average,
+    max, min&lt;/em&gt; or, as in the example rule from the beginning of this
+    section, a &lt;em&gt;sum&lt;/em&gt;.&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-java&quot;&gt;&lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;boolean&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;isStateValueInWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowStartForEvent&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;currentEventTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span cla [...]
+  &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;&amp;gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowStartForEvent&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;&amp;lt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;currentEventTime&lt;/span&gt;&lt;span class=&qu [...]
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+
+&lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;aggregateValuesInState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;SimpleAccumulator&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;BigDecimal&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;aggregator&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;sp [...]
+  &lt;span class=&quot;n&quot;&gt;Set&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Transaction&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;inWindow&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span class=&quot;o&quot [...]
+  &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Transaction&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;event&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;inWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;BigDecimal&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;aggregatedValue&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;FieldsExtractor&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getBigDecimalByName&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;rule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getAggregateFieldName&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;event&lt;/span&gt;&lt;sp [...]
+    &lt;span class=&quot;n&quot;&gt;aggregator&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;add&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;aggregatedValue&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;5) Having an aggregate value, we can compare it to the threshold value
+    that is specified in the rule definition and fire an alert, if
+    necessary.&lt;/p&gt;
+
+&lt;p&gt;6) At the end, we register a clean-up timer using
+&lt;code&gt;ctx.timerService().registerEventTimeTimer()&lt;/code&gt;. This timer will be
+    responsible for removing the current transaction when it is going to
+    move out of scope.&lt;/p&gt;
+
+&lt;div class=&quot;alert alert-info&quot;&gt;
+  &lt;p&gt;&lt;span class=&quot;label label-info&quot; style=&quot;display: inline-block&quot;&gt;&lt;span class=&quot;glyphicon glyphicon-info-sign&quot; aria-hidden=&quot;true&quot;&gt;&lt;/span&gt;  Note &lt;/span&gt;
+Notice the rounding during timer creation. It is an important technique
+which enables a reasonable trade-off between the precision with which
+the timers will be triggered, and the number of timers being used.
+Timers are stored in Flink’s fault-tolerant state, and managing them
+with millisecond-level precision can be wasteful. In our case, with this
+rounding, we will create at most one timer per key in any given second. Flink documentation provides some additional &lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#timer-coalescing&quot;&gt;&lt;u&gt;details&lt;/u&gt;&lt;/a&gt;.&lt;/p&gt;
+&lt;/div&gt;
+
+&lt;p&gt;7) The &lt;code&gt;onTimer&lt;/code&gt; method will trigger the clean-up of the window state.&lt;/p&gt;
+
+&lt;p&gt;As previously described, we are always keeping as many events in the
+state as required for the evaluation of an active rule with the widest
+window span. This means that during the clean-up, we only need to remove
+the state which is out of scope of this widest window.&lt;/p&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/patterns-blog-3/widest-window.png&quot; width=&quot;800px&quot; alt=&quot;Figure 6: Widest Window&quot; /&gt;
+&lt;br /&gt;
+&lt;i&gt;&lt;small&gt;Figure 6: Widest Window&lt;/small&gt;&lt;/i&gt;
+&lt;/center&gt;
+&lt;p&gt;&lt;br /&gt;&lt;/p&gt;
+
+&lt;p&gt;This is how the clean-up procedure can be implemented:&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-java&quot;&gt;&lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
+&lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;onTimer&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;timestamp&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;O [...]
+    &lt;span class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+
+  &lt;span class=&quot;n&quot;&gt;Rule&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;widestWindowRule&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;ctx&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;getBroadcastState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Descriptors&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&qu [...]
+
+  &lt;span class=&quot;n&quot;&gt;Optional&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;cleanupEventTimeWindow&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt;
+      &lt;span class=&quot;n&quot;&gt;Optional&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;ofNullable&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;widestWindowRule&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;map&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;nl&quot;&gt;Rule:&lt;/span&gt;&lt;span class=&quot;o&quot;&gt [...]
+  &lt;span class=&quot;n&quot;&gt;Optional&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;cleanupEventTimeThreshold&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt;
+      &lt;span class=&quot;n&quot;&gt;cleanupEventTimeWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;map&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;window&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;-&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;timestamp&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;-&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;window&lt;/span&gt;&lt;span class=&qu [...]
+  &lt;span class=&quot;c1&quot;&gt;// Remove events that are older than (timestamp - widestWindowSpan)ms&lt;/span&gt;
+  &lt;span class=&quot;n&quot;&gt;cleanupEventTimeThreshold&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;ifPresent&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;this&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;::&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;evictOutOfScopeElementsFromWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+
+&lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;evictOutOfScopeElementsFromWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;threshold&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+  &lt;span class=&quot;k&quot;&gt;try&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;Iterator&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;keys&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowState&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;keys&lt;/span&gt;&lt;span class=&quot;o&quot;&g [...]
+    &lt;span class=&quot;k&quot;&gt;while&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;keys&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;hasNext&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;())&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+      &lt;span class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;keys&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;next&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;();&lt;/span&gt;
+      &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;stateEventTime&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;threshold&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;keys&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;na&quot;&gt;remove&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;();&lt;/span&gt;
+      &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;catch&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;ex&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+    &lt;span class=&quot;k&quot;&gt;throw&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span class=&quot;nf&quot;&gt;RuntimeException&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;ex&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+  &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;div class=&quot;alert alert-info&quot;&gt;
+  &lt;p&gt;&lt;span class=&quot;label label-info&quot; style=&quot;display: inline-block&quot;&gt;&lt;span class=&quot;glyphicon glyphicon-info-sign&quot; aria-hidden=&quot;true&quot;&gt;&lt;/span&gt;  Note&lt;/span&gt;
+You might be wondering why we did not use &lt;code&gt;ListState&lt;/code&gt; , as we are always
+iterating over all of the values of the window state? This is actually
+an optimization for the case when &lt;code&gt;RocksDBStateBackend&lt;/code&gt;
+&lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#the-rocksdbstatebackend&quot;&gt;is used&lt;/a&gt;. Iterating over a &lt;code&gt;ListState&lt;/code&gt; would cause all of the &lt;code&gt;Transaction&lt;/code&gt;
+objects to be deserialized. Using &lt;code&gt;MapState&lt;/code&gt;&#39;s keys iterator only causes
+deserialization of the keys (type &lt;code&gt;long&lt;/code&gt;), and therefore reduces the
+computational overhead.&lt;/p&gt;
+&lt;/div&gt;
+
+&lt;p&gt;This concludes the description of the implementation details. Our
+approach triggers evaluation of a time window as soon as a new
+transaction arrives. It therefore fulfills the main requirement that we
+have targeted - low delay for potentially issuing an alert. For the
+complete implementation, please have a look at
+&lt;a href=&quot;https://github.com/afedulov/fraud-detection-demo&quot;&gt;the project on github&lt;/a&gt;.&lt;/p&gt;
+
+&lt;h2 id=&quot;improvements-and-optimizations&quot;&gt;Improvements and Optimizations&lt;/h2&gt;
+
+&lt;p&gt;What are the pros and cons of the described approach?&lt;/p&gt;
+
+&lt;p&gt;&lt;strong&gt;Pros:&lt;/strong&gt;&lt;/p&gt;
+
+&lt;ul&gt;
+  &lt;li&gt;
+    &lt;p&gt;Low latency capabilities&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Tailored solution with potential use-case specific optimizations&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Efficient state reuse (shared state for the rules with the same key)&lt;/p&gt;
+  &lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;&lt;strong&gt;Cons:&lt;/strong&gt;&lt;/p&gt;
+
+&lt;ul&gt;
+  &lt;li&gt;
+    &lt;p&gt;Cannot make use of potential future optimizations in the existing
+Window API&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;No late event handling, which is available out of the box in the
+Window API&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Quadratic computation complexity and potentially large state&lt;/p&gt;
+  &lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;Let’s now look at the latter two drawbacks and see if we can address
+them.&lt;/p&gt;
+
+&lt;h4 id=&quot;late-events&quot;&gt;Late events:&lt;/h4&gt;
+
+&lt;p&gt;Processing late events poses a certain question - is it still meaningful
+to re-evaluate the window in case of a late event arrival? In case this
+is required, you would need to extend the widest window used for the
+clean-up by your maximum expected out-of-orderness. This would avoid
+having potentially incomplete time window data for such late firings
+(see Figure 7).&lt;/p&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/patterns-blog-3/late-events.png&quot; width=&quot;500px&quot; alt=&quot;Figure 7: Late Events Handling&quot; /&gt;
+&lt;br /&gt;
+&lt;i&gt;&lt;small&gt;Figure 7: Late Events Handling&lt;/small&gt;&lt;/i&gt;
+&lt;/center&gt;
+&lt;p&gt;&lt;br /&gt;&lt;/p&gt;
+
+&lt;p&gt;It can be argued, however, that for a use case that puts emphasis on low
+latency processing, such late triggering would be meaningless. In this
+case, we could keep track of the most recent timestamp that we have
+observed so far, and for events that do not monotonically increase this
+value, only add them to the state and skip the aggregate calculation and
+the alert triggering logic.&lt;/p&gt;
+
+&lt;h4 id=&quot;redundant-re-computations-and-state-size&quot;&gt;Redundant Re-computations and State Size:&lt;/h4&gt;
+
+&lt;p&gt;In our described implementation we keep individual transactions in state
+and go over them to calculate the aggregate again and again on every new
+event. This is obviously not optimal in terms of wasting computational
+resources on repeated calculations.&lt;/p&gt;
+
+&lt;p&gt;What is the main reason to keep the individual transactions in state?
+The granularity of stored events directly corresponds to the precision
+of the time window calculation. Because we store transactions
+individually, we can precisely ignore individual transactions as soon as
+they leave the exact 2592000000 ms time window (30 days in ms). At this
+point, it is worth raising the question - do we really need this
+milliseconds precision when estimating such a long time window, or is it
+OK to accept potential false positives in exceptional cases? If the
+answer for your use case is that such precision is not needed, you could
+implement additional optimization based on bucketing and
+pre-aggregation. The idea of this optimization can be broken down as
+follows:&lt;/p&gt;
+
+&lt;ul&gt;
+  &lt;li&gt;
+    &lt;p&gt;Instead of storing individual events, create a parent class that can
+either contain fields of a single transaction, or combined values,
+calculated based on applying an aggregate function to a set of
+transactions.&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Instead of using timestamps in milliseconds as &lt;code&gt;MapState&lt;/code&gt; keys, round
+them to the level of “resolution” that you are willing to accept
+(for instance, a full minute). Each entry therefore represents a
+bucket.&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Whenever a window is evaluated, append the new transaction’s data to
+the bucket aggregate instead of storing individual data points per
+transaction.&lt;/p&gt;
+  &lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/patterns-blog-3/pre-aggregation.png&quot; width=&quot;700px&quot; alt=&quot;Figure 8: Pre-aggregation&quot; /&gt;
+&lt;br /&gt;
+&lt;i&gt;&lt;small&gt;Figure 8: Pre-aggregation&lt;/small&gt;&lt;/i&gt;
+&lt;/center&gt;
+&lt;p&gt;&lt;br /&gt;&lt;/p&gt;
+
+&lt;h4 id=&quot;state-data-and-serializers&quot;&gt;State Data and Serializers&lt;/h4&gt;
+
+&lt;p&gt;Another question that we can ask ourselves in order to further optimize
+the implementation is how probable is it to get different events with
+exactly the same timestamp. In the described implementation, we
+demonstrated one way of approaching this question by storing sets of
+transactions per timestamp in &lt;code&gt;MapState&amp;lt;Long, Set&amp;lt;Transaction&amp;gt;&amp;gt;&lt;/code&gt;. Such
+a choice, however, might have a more significant effect on performance
+than might be anticipated. The reason is that Flink does not currently
+provide a native &lt;code&gt;Set&lt;/code&gt; serializer and will enforce a fallback to the less
+efficient &lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/types_serialization.html#general-class-types&quot;&gt;Kryo
+serializer&lt;/a&gt;
+instead
+(&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-16729&quot;&gt;FLINK-16729&lt;/a&gt;). A
+meaningful alternative strategy is to assume that, in a normal scenario,
+no two discrepant events can have exactly the same timestamp and to turn
+the window state into a &lt;code&gt;MapState&amp;lt;Long, Transaction&amp;gt;&lt;/code&gt; type. You can use
+&lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html&quot;&gt;side-outputs&lt;/a&gt;
+to collect and monitor any unexpected occurrences which contradict your
+assumption. During performance optimizations, I generally recommend you
+to &lt;a href=&quot;https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#disabling-kryo&quot;&gt;disable the fallback to
+Kryo&lt;/a&gt;
+and verify where your application might be further optimized by ensuring
+that &lt;a href=&quot;https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#performance-comparison&quot;&gt;more efficient
+serializers&lt;/a&gt;
+are being used.&lt;/p&gt;
+
+&lt;div class=&quot;alert alert-info&quot;&gt;
+  &lt;p&gt;&lt;span class=&quot;label label-info&quot; style=&quot;display: inline-block&quot;&gt;&lt;span class=&quot;glyphicon glyphicon-info-sign&quot; aria-hidden=&quot;true&quot;&gt;&lt;/span&gt;  Tip:&lt;/span&gt;
+you can quickly determine which serializer is going to be
+used for your classes by setting a breakpoint and verifying the type of
+the returned TypeInformation.
+&lt;br /&gt;&lt;/p&gt;
+
+  &lt;center&gt;
+&lt;table class=&quot;tg&quot;&gt;
+  &lt;tr&gt;
+    &lt;td class=&quot;tg-topcenter&quot;&gt;
+      &lt;img src=&quot;/img/blog/patterns-blog-3/type-pojo.png&quot; alt=&quot;POJO&quot; /&gt;&lt;/td&gt;
+    &lt;td class=&quot;tg-topcenter&quot;&gt;
+      &lt;i&gt;PojoTypeInfo&lt;/i&gt; indicates that that an efficient Flink POJO serializer will be used.&lt;/td&gt;
+  &lt;/tr&gt;
+  &lt;tr&gt;
+    &lt;td class=&quot;tg-top&quot;&gt;
+      &lt;img src=&quot;/img/blog/patterns-blog-3/type-kryo.png&quot; alt=&quot;Kryo&quot; /&gt;&lt;/td&gt;
+    &lt;td class=&quot;tg-topcenter&quot;&gt;
+      &lt;i&gt;GenericTypeInfo&lt;/i&gt; indicates the fallback to a Kryo serializer.&lt;/td&gt;
+  &lt;/tr&gt;
+&lt;/table&gt;
+&lt;/center&gt;
+&lt;/div&gt;
+
+&lt;p&gt;&lt;strong&gt;Event pruning&lt;/strong&gt;: instead of storing complete events and putting
+additional stress on the ser/de machinery, we can reduce individual
+events data to only relevant information. This would potentially require
+“unpacking” individual events as fields, and storing those fields into a
+generic &lt;code&gt;Map&amp;lt;String, Object&amp;gt;&lt;/code&gt; data structure, based on the
+configurations of active rules.&lt;/p&gt;
+
+&lt;p&gt;While this adjustment could potentially produce significant improvements
+for objects of large size, it should not be your first pick as it can
+easily turn into a premature optimization.&lt;/p&gt;
+
+&lt;h2 id=&quot;summary&quot;&gt;Summary:&lt;/h2&gt;
+
+&lt;p&gt;This article concludes the description of the implementation of the
+fraud detection engine that we started in &lt;a href=&quot;/news/2020/01/15/demo-fraud-detection.html&quot;&gt;part one&lt;/a&gt;. In this blog
+post we demonstrated how &lt;code&gt;ProcessFunction&lt;/code&gt; can be utilized to
+&quot;impersonate&quot; a window with a sophisticated custom logic. We have
+discussed the pros and cons of such approach and elaborated how custom
+use-case-specific optimizations can be applied - something that would
+not be directly possible with the Window API.&lt;/p&gt;
+
+&lt;p&gt;The goal of this blog post was to illustrate the power and flexibility
+of Apache Flink’s APIs. At the core of it are the pillars of Flink, that
+spare you, as a developer, very significant amounts of work and
+generalize well to a wide range of use cases by providing:&lt;/p&gt;
+
+&lt;ul&gt;
+  &lt;li&gt;
+    &lt;p&gt;Efficient data exchange in a distributed cluster&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Horizontal scalability via data partitioning&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Fault-tolerant state with quick, local access&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Convenient abstraction for working with this state, which is as simple as using a
+local variable&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Multi-threaded, parallel execution engine. &lt;code&gt;ProcessFunction&lt;/code&gt; code runs
+in a single thread, without the need for synchronization. Flink
+handles all the parallel execution aspects and correct access to the
+shared state, without you, as a developer, having to think about it
+(concurrency is hard).&lt;/p&gt;
+  &lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;All these aspects make it possible to build applications with Flink that
+go well beyond trivial streaming ETL use cases and enable implementation
+of arbitrarily-sophisticated, distributed event-driven applications.
+With Flink, you can rethink approaches to a wide range of use cases
+which normally would rely on using stateless parallel execution nodes
+and “pushing” the concerns of state fault tolerance to a database, an
+approach that is often destined to run into scalability issues in the
+face of ever-increasing data volumes.&lt;/p&gt;
+</description>
+<pubDate>Thu, 30 Jul 2020 14:00:00 +0200</pubDate>
+<link>https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html</link>
+<guid isPermaLink="true">/news/2020/07/30/demo-fraud-detection-3.html</guid>
+</item>
+
+<item>
 <title>Flink SQL Demo: Building an End-to-End Streaming Application</title>
 <description>&lt;p&gt;Apache Flink 1.11 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.&lt;/p&gt;
 
@@ -289,6 +954,240 @@ As the maximum time is also a part of the primary key of the sink, the final res
 </item>
 
 <item>
+<title>Flink Community Update - July&#39;20</title>
+<description>&lt;p&gt;As July draws to an end, we look back at a monthful of activity in the Flink community, including two releases (!) and some work around improving the first-time contribution experience in the project.&lt;/p&gt;
+
+&lt;p&gt;Also, events are starting to pick up again, so we’ve put together a list of some great ones you can (virtually) attend in August!&lt;/p&gt;
+
+&lt;div class=&quot;page-toc&quot;&gt;
+&lt;ul id=&quot;markdown-toc&quot;&gt;
+  &lt;li&gt;&lt;a href=&quot;#the-past-month-in-flink&quot; id=&quot;markdown-toc-the-past-month-in-flink&quot;&gt;The Past Month in Flink&lt;/a&gt;    &lt;ul&gt;
+      &lt;li&gt;&lt;a href=&quot;#flink-releases&quot; id=&quot;markdown-toc-flink-releases&quot;&gt;Flink Releases&lt;/a&gt;        &lt;ul&gt;
+          &lt;li&gt;&lt;a href=&quot;#flink-111&quot; id=&quot;markdown-toc-flink-111&quot;&gt;Flink 1.11&lt;/a&gt;&lt;/li&gt;
+          &lt;li&gt;&lt;a href=&quot;#flink-1111&quot; id=&quot;markdown-toc-flink-1111&quot;&gt;Flink 1.11.1&lt;/a&gt;&lt;/li&gt;
+        &lt;/ul&gt;
+      &lt;/li&gt;
+      &lt;li&gt;&lt;a href=&quot;#gearing-up-for-flink-112&quot; id=&quot;markdown-toc-gearing-up-for-flink-112&quot;&gt;Gearing up for Flink 1.12&lt;/a&gt;&lt;/li&gt;
+      &lt;li&gt;&lt;a href=&quot;#new-committers-and-pmc-members&quot; id=&quot;markdown-toc-new-committers-and-pmc-members&quot;&gt;New Committers and PMC Members&lt;/a&gt;        &lt;ul&gt;
+          &lt;li&gt;&lt;a href=&quot;#new-pmc-members&quot; id=&quot;markdown-toc-new-pmc-members&quot;&gt;New PMC Members&lt;/a&gt;&lt;/li&gt;
+        &lt;/ul&gt;
+      &lt;/li&gt;
+    &lt;/ul&gt;
+  &lt;/li&gt;
+  &lt;li&gt;&lt;a href=&quot;#the-bigger-picture&quot; id=&quot;markdown-toc-the-bigger-picture&quot;&gt;The Bigger Picture&lt;/a&gt;    &lt;ul&gt;
+      &lt;li&gt;&lt;a href=&quot;#a-look-into-the-evolution-of-flink-releases&quot; id=&quot;markdown-toc-a-look-into-the-evolution-of-flink-releases&quot;&gt;A Look Into the Evolution of Flink Releases&lt;/a&gt;&lt;/li&gt;
+      &lt;li&gt;&lt;a href=&quot;#first-time-contributor-guide&quot; id=&quot;markdown-toc-first-time-contributor-guide&quot;&gt;First-time Contributor Guide&lt;/a&gt;&lt;/li&gt;
+      &lt;li&gt;&lt;a href=&quot;#replacing-charged-words-in-the-flink-repo&quot; id=&quot;markdown-toc-replacing-charged-words-in-the-flink-repo&quot;&gt;Replacing “charged” words in the Flink repo&lt;/a&gt;&lt;/li&gt;
+    &lt;/ul&gt;
+  &lt;/li&gt;
+  &lt;li&gt;&lt;a href=&quot;#upcoming-events-and-more&quot; id=&quot;markdown-toc-upcoming-events-and-more&quot;&gt;Upcoming Events (and More!)&lt;/a&gt;&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;/div&gt;
+
+&lt;h1 id=&quot;the-past-month-in-flink&quot;&gt;The Past Month in Flink&lt;/h1&gt;
+
+&lt;h2 id=&quot;flink-releases&quot;&gt;Flink Releases&lt;/h2&gt;
+
+&lt;h3 id=&quot;flink-111&quot;&gt;Flink 1.11&lt;/h3&gt;
+
+&lt;p&gt;A couple of weeks ago, Flink 1.11 was announced in what was (again) the biggest Flink release to date (&lt;em&gt;see &lt;a href=&quot;#a-look-into-the-evolution-of-flink-releases&quot;&gt;“A Look Into the Evolution of Flink Releases”&lt;/a&gt;&lt;/em&gt;)! The new release brought significant improvements to usability as well as new features to Flink users across the API stack. Some highlights of Flink 1.11 are:&lt;/p&gt;
+
+&lt;ul&gt;
+  &lt;li&gt;
+    &lt;p&gt;Unaligned checkpoints to cope with high backpressure scenarios;&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;The new source API, that simplifies and unifies the implementation of (custom) sources;&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Support for Change Data Capture (CDC) and other common use cases in the Table API/SQL;&lt;/p&gt;
+  &lt;/li&gt;
+  &lt;li&gt;
+    &lt;p&gt;Pandas UDFs and other performance optimizations in PyFlink, making it more powerful for data science and ML workloads.&lt;/p&gt;
+  &lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;For a more detailed look into the release, you can recap the &lt;a href=&quot;https://flink.apache.org/news/2020/07/06/release-1.11.0.html&quot;&gt;announcement blogpost&lt;/a&gt; and join the upcoming meetup on &lt;a href=&quot;https://www.meetup.com/seattle-flink/events/271922632/&quot;&gt;“What’s new in Flink 1.11?”&lt;/a&gt;, where you’ll be able to ask anything release-related to Aljoscha Krettek (Flink PMC Member). The community has also been working on a series of blogpos [...]
+
+&lt;h3 id=&quot;flink-1111&quot;&gt;Flink 1.11.1&lt;/h3&gt;
+
+&lt;p&gt;Shortly after releasing Flink 1.11, the community announced the first patch version to cover some outstanding issues in the major release. This version is &lt;strong&gt;particularly important for users of the Table API/SQL&lt;/strong&gt;, as it addresses known limitations that affect the usability of new features like changelog sources and support for JDBC catalogs.&lt;/p&gt;
+
+&lt;p&gt;You can find a detailed list with all the improvements and bugfixes that went into Flink 1.11.1 in the &lt;a href=&quot;https://flink.apache.org/news/2020/07/21/release-1.11.1.html&quot;&gt;announcement blogpost&lt;/a&gt;.&lt;/p&gt;
+
+&lt;hr /&gt;
+
+&lt;h2 id=&quot;gearing-up-for-flink-112&quot;&gt;Gearing up for Flink 1.12&lt;/h2&gt;
+
+&lt;p&gt;The Flink 1.12 release cycle has been kicked-off last week and a discussion about what features will go into the upcoming release is underway in &lt;a href=&quot;https://lists.apache.org/thread.html/rb01160c7c9c26304a7665f9a252d4ed1583173620df307015c095fcf%40%3Cdev.flink.apache.org%3E&quot;&gt;this @dev Mailing List thread&lt;/a&gt;. While we wait for more of these ideas to turn into proposals and JIRA issues, here are some recent FLIPs that are already being discussed in the Fl [...]
+
+&lt;table class=&quot;table table-bordered&quot;&gt;
+  &lt;thead&gt;
+    &lt;tr&gt;
+      &lt;th&gt;FLIP&lt;/th&gt;
+      &lt;th&gt;&lt;/th&gt;
+    &lt;/tr&gt;
+  &lt;/thead&gt;
+  &lt;tbody&gt;
+    &lt;tr&gt;
+      &lt;td&gt;&lt;a href=&quot;https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866298&quot;&gt;FLIP-130&lt;/a&gt;&lt;/td&gt;
+        &lt;td&gt;&lt;ul&gt;
+        &lt;li&gt;&lt;b&gt;Support Python DataStream API&lt;/b&gt;&lt;/li&gt;
+        &lt;p&gt;Python support in Flink has so far been bounded to the Table API/SQL. These APIs are high-level and convenient, but have some limitations for more complex stream processing use cases. To expand the usability of PyFlink to a broader set of use cases, FLIP-130 proposes to support it also in the DataStream API, starting with stateless operations.&lt;/p&gt;
+      &lt;/ul&gt;
+      &lt;/td&gt;
+    &lt;/tr&gt;
+    &lt;tr&gt;
+      &lt;td&gt;&lt;a href=&quot;https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL&quot;&gt;FLIP-132&lt;/a&gt;&lt;/td&gt;
+        &lt;td&gt;&lt;ul&gt;
+        &lt;li&gt;&lt;b&gt;Temporal Table DDL&lt;/b&gt;&lt;/li&gt;
+        &lt;p&gt;Flink SQL users can&#39;t currently create temporal tables using SQL DDL, which forces them to change context frequently for use cases that require them. FLIP-132 proposes to extend the DDL syntax to support temporal tables, which in turn will allow to also bring &lt;a href=&quot;https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#join-with-a-temporal-table&quot;&gt;temporal joins&lt;/a&gt; with changelog sources to Flink SQL.&lt;/p&gt;
+      &lt;/ul&gt;
+      &lt;/td&gt;
+    &lt;/tr&gt;
+  &lt;/tbody&gt;
+&lt;/table&gt;
+
+&lt;hr /&gt;
+
+&lt;h2 id=&quot;new-committers-and-pmc-members&quot;&gt;New Committers and PMC Members&lt;/h2&gt;
+
+&lt;p&gt;The Apache Flink community has welcomed &lt;strong&gt;2 new PMC Members&lt;/strong&gt; since the last update. Congratulations!&lt;/p&gt;
+
+&lt;h3 id=&quot;new-pmc-members&quot;&gt;New PMC Members&lt;/h3&gt;
+
+&lt;div class=&quot;row&quot;&gt;
+  &lt;div class=&quot;col-lg-3&quot;&gt;
+    &lt;div class=&quot;text-center&quot;&gt;
+      &lt;img class=&quot;img-circle&quot; src=&quot;https://avatars0.githubusercontent.com/u/8957547?s=400&amp;amp;u=4560f775da9ebc5f3aa2e1563f57cdad03862ce8&amp;amp;v=4&quot; width=&quot;90&quot; height=&quot;90&quot; /&gt;
+      &lt;p&gt;&lt;a href=&quot;https://twitter.com/PiotrNowojski&quot;&gt;Piotr Nowojski&lt;/a&gt;&lt;/p&gt;
+    &lt;/div&gt;
+  &lt;/div&gt;
+  &lt;div class=&quot;col-lg-3&quot;&gt;
+    &lt;div class=&quot;text-center&quot;&gt;
+      &lt;img class=&quot;img-circle&quot; src=&quot;https://avatars0.githubusercontent.com/u/6239804?s=460&amp;amp;u=6cd81b1ab38fcc6a5736fcfa957c51093bf060e2&amp;amp;v=4&quot; width=&quot;90&quot; height=&quot;90&quot; /&gt;
+      &lt;p&gt;&lt;a href=&quot;https://twitter.com/LiyuApache&quot;&gt;Yu Li&lt;/a&gt;&lt;/p&gt;
+    &lt;/div&gt;
+  &lt;/div&gt;
+&lt;/div&gt;
+
+&lt;hr /&gt;
+
+&lt;h1 id=&quot;the-bigger-picture&quot;&gt;The Bigger Picture&lt;/h1&gt;
+
+&lt;h2 id=&quot;a-look-into-the-evolution-of-flink-releases&quot;&gt;A Look Into the Evolution of Flink Releases&lt;/h2&gt;
+
+&lt;p&gt;It’s &lt;a href=&quot;https://flink.apache.org/news/2020/04/01/community-update.html#a-look-into-the-flink-repository&quot;&gt;been a while&lt;/a&gt; since we had a look at community numbers, so this time we’d like to shed some light on the evolution of contributors and, well, work across releases. Let’s have a look at some &lt;em&gt;git&lt;/em&gt; data:&lt;/p&gt;
+
+&lt;div style=&quot;line-height:60%;&quot;&gt;
+    &lt;br /&gt;
+&lt;/div&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/2020-07-29-community-update/2020-07-29_releases.png&quot; width=&quot;600px&quot; alt=&quot;Flink Releases&quot; /&gt;
+&lt;/center&gt;
+
+&lt;div style=&quot;line-height:60%;&quot;&gt;
+    &lt;br /&gt;
+&lt;/div&gt;
+
+&lt;p&gt;If we consider Flink 1.8 (Apr. 2019) as the baseline, the Flink community more than &lt;strong&gt;tripled&lt;/strong&gt; the number of implemented and/or resolved issues in a single release with the support of an &lt;strong&gt;additional ~100 contributors&lt;/strong&gt; in Flink 1.11. This is pretty impressive on its own, and even more so if you consider that Flink contributors are distributed around the globe, working across different locations and timezones!&lt;/p&gt;
+
+&lt;hr /&gt;
+
+&lt;h2 id=&quot;first-time-contributor-guide&quot;&gt;First-time Contributor Guide&lt;/h2&gt;
+
+&lt;p&gt;Flink has an extensive guide for &lt;a href=&quot;https://flink.apache.org/contributing/how-to-contribute.html&quot;&gt;code and non-code contributions&lt;/a&gt; that helps new community members navigate the project and get familiar with existing contribution guidelines. In particular for code contributions, knowing where to start can be difficult, given the sheer size of the Flink codebase and the pace of development of the project.&lt;/p&gt;
+
+&lt;p&gt;To better guide new contributors, a brief section was added to the guide on &lt;a href=&quot;https://flink.apache.org/contributing/contribute-code.html#looking-for-what-to-contribute&quot;&gt;how to look for what to contribute&lt;/a&gt; and the &lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-18704?filter=12349196&quot;&gt;&lt;em&gt;starter&lt;/em&gt; label&lt;/a&gt; has been revived in Jira to highlight issues that are suitable for first-time contributors.&lt;/p&gt;
+
+&lt;div class=&quot;alert alert-info&quot;&gt;
+  &lt;p&gt;&lt;span class=&quot;label label-info&quot; style=&quot;display: inline-block&quot;&gt;&lt;span class=&quot;glyphicon glyphicon-info-sign&quot; aria-hidden=&quot;true&quot;&gt;&lt;/span&gt; Note &lt;/span&gt;
+As a reminder, you no longer need to ask for contributor permissions to start contributing to Flink. Once you’ve found something you’d like to work on, read the &lt;a href=&quot;https://flink.apache.org/contributing/contribute-code.html&quot;&gt;contribution guide&lt;/a&gt; carefully and reach out to a Flink Committer, who will be able to help you get started.&lt;/p&gt;
+&lt;/div&gt;
+
+&lt;h2 id=&quot;replacing-charged-words-in-the-flink-repo&quot;&gt;Replacing “charged” words in the Flink repo&lt;/h2&gt;
+
+&lt;p&gt;The community is working on gradually replacing words that are outdated and carry a negative connotation in the Flink codebase, such as “master/slave” and “whitelist/blacklist”. The progress of this work can be tracked in &lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-18209&quot;&gt;FLINK-18209&lt;/a&gt;.&lt;/p&gt;
+
+&lt;hr /&gt;
+
+&lt;h1 id=&quot;upcoming-events-and-more&quot;&gt;Upcoming Events (and More!)&lt;/h1&gt;
+
+&lt;p&gt;We’re happy to see the “high season” of virtual events approaching, with a lot of great conferences taking place in the coming month, as well as some meetups. Here, we highlight some of the Flink talks happening in those events, but we recommend checking out the complete event programs!&lt;/p&gt;
+
+&lt;p&gt;As usual, we also leave you with some resources to read and explore.&lt;/p&gt;
+
+&lt;table class=&quot;table table-bordered&quot;&gt;
+  &lt;thead&gt;
+    &lt;tr&gt;
+      &lt;th&gt;Category&lt;/th&gt;
+      &lt;th&gt;&lt;/th&gt;
+    &lt;/tr&gt;
+  &lt;/thead&gt;
+  &lt;tbody&gt;
+    &lt;tr&gt;
+      &lt;td&gt;&lt;span class=&quot;glyphicon glyphicon glyphicon-console&quot; aria-hidden=&quot;true&quot;&gt;&lt;/span&gt; Events&lt;/td&gt;
+      &lt;td&gt;&lt;ul&gt;
+        &lt;b&gt;Virtual Flink Meetup (Jul. 29)&lt;/b&gt;
+        &lt;p&gt;&lt;a href=&quot;https://www.meetup.com/seattle-flink/events/271922632/&quot;&gt;What’s new in Flink 1.11? + Q&amp;amp;A with Aljoscha Krettek&lt;/a&gt;&lt;/p&gt;
+      &lt;/ul&gt;
+      &lt;ul&gt;
+        &lt;b&gt;DC Thursday (Jul. 30)&lt;/b&gt;
+        &lt;p&gt;&lt;a href=&quot;https://www.eventbrite.com/e/dc-thurs-apache-flink-w-stephan-ewen-tickets-112137488246?utm_campaign=Events%20%26%20Talks&amp;amp;utm_content=135006406&amp;amp;utm_medium=social&amp;amp;utm_source=twitter&amp;amp;hss_channel=tw-2581958070&quot;&gt;Interview and Community Q&amp;amp;A with Stephan Ewen&lt;/a&gt;&lt;/p&gt;
+      &lt;/ul&gt;
+      &lt;ul&gt;
+        &lt;b&gt;KubeCon + CloudNativeCon Europe (Aug. 17-20)&lt;/b&gt;
+        &lt;p&gt;&lt;a href=&quot;https://kccnceu20.sched.com/event/ZelA/stateful-serverless-and-the-elephant-in-the-room-stephan-ewen-ververica&quot;&gt;Stateful Serverless and the Elephant in the Room&lt;/a&gt;&lt;/p&gt;
+      &lt;/ul&gt;
+      &lt;ul&gt;
+        &lt;b&gt;DataEngBytes (Aug. 20-21)&lt;/b&gt;
+        &lt;p&gt;&lt;a href=&quot;https://dataengconf.com.au/&quot;&gt;Change Data Capture with Flink SQL and Debezium&lt;/a&gt;&lt;/p&gt;
+        &lt;p&gt;&lt;a href=&quot;https://dataengconf.com.au/&quot;&gt;Sweet Streams are Made of These: Data Driven Development with Stream Processing&lt;/a&gt;&lt;/p&gt;
+      &lt;/ul&gt;
+      &lt;ul&gt;
+        &lt;b&gt;Beam Summit (Aug. 24-29)&lt;/b&gt;
+        &lt;p&gt;&lt;a href=&quot;https://2020.beamsummit.org/sessions/streaming-fast-slow/&quot;&gt;Streaming, Fast and Slow&lt;/a&gt;&lt;/p&gt;
+        &lt;p&gt;&lt;a href=&quot;https://2020.beamsummit.org/sessions/building-stateful-streaming-pipelines/&quot;&gt;Building Stateful Streaming Pipelines With Beam&lt;/a&gt;&lt;/p&gt;
+      &lt;/ul&gt;
+    &lt;/td&gt;
+    &lt;/tr&gt;
+    &lt;tr&gt;
+      &lt;td&gt;&lt;span class=&quot;glyphicon glyphicon-fire&quot; aria-hidden=&quot;true&quot;&gt;&lt;/span&gt; Blogposts&lt;/td&gt;
+        &lt;td&gt;&lt;ul&gt;
+        &lt;b&gt;Flink 1.11 Series&lt;/b&gt;
+        &lt;li&gt;&lt;a href=&quot;https://flink.apache.org/news/2020/07/14/application-mode.html&quot;&gt;Application Deployment in Flink: Current State and the new Application Mode&lt;/a&gt;&lt;/li&gt;
+        &lt;li&gt;&lt;a href=&quot;https://flink.apache.org/2020/07/23/catalogs.html&quot;&gt;Sharing is caring - Catalogs in Flink SQL (Tutorial)&lt;/a&gt;&lt;/li&gt;
+        &lt;li&gt;&lt;a href=&quot;https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html&quot;&gt;Flink SQL Demo: Building an End-to-End Streaming Application (Tutorial)&lt;/a&gt;&lt;/li&gt;
+        &lt;p&gt;&lt;/p&gt;
+        &lt;b&gt;Other&lt;/b&gt;
+        &lt;li&gt;&lt;a href=&quot;https://blogs.oracle.com/javamagazine/streaming-analytics-with-java-and-apache-flink?source=:em:nw:mt::RC_WWMK200429P00043:NSL400072808&amp;amp;elq_mid=167902&amp;amp;sh=162609181316181313222609291604350235&amp;amp;cmid=WWMK200429P00043C0004&quot;&gt;Streaming analytics with Java and Apache Flink (Tutorial)&lt;/a&gt;&lt;/li&gt;
+        &lt;li&gt;&lt;a href=&quot;https://www.ververica.com/blog/flink-for-online-machine-learning-and-real-time-processing-at-weibo&quot;&gt;Flink for online Machine Learning and real-time processing at Weibo&lt;/a&gt;&lt;/li&gt;
+        &lt;li&gt;&lt;a href=&quot;https://www.ververica.com/blog/data-driven-matchmaking-at-azar-with-apache-flink&quot;&gt;Data-driven Matchmaking at Azar with Apache Flink&lt;/a&gt;&lt;/li&gt;
+      &lt;/ul&gt;
+      &lt;/td&gt;
+    &lt;/tr&gt;
+      &lt;td&gt;&lt;span class=&quot;glyphicon glyphicon glyphicon-certificate&quot; aria-hidden=&quot;true&quot;&gt;&lt;/span&gt; Flink Packages&lt;/td&gt;
+      &lt;td&gt;&lt;ul&gt;&lt;p&gt;&lt;a href=&quot;https://flink-packages.org/&quot;&gt;Flink Packages&lt;/a&gt; is a website where you can explore (and contribute to) the Flink &lt;br /&gt; ecosystem of connectors, extensions, APIs, tools and integrations. &lt;b&gt;New in:&lt;/b&gt; &lt;/p&gt;
+          &lt;li&gt;&lt;a href=&quot;https://flink-packages.org/packages/flink-metrics-signalfx&quot;&gt; SignalFx Metrics Reporter&lt;/a&gt;&lt;/li&gt;
+          &lt;li&gt;&lt;a href=&quot;https://flink-packages.org/packages/yauaa&quot;&gt;Yauaa: Yet Another UserAgent Analyzer&lt;/a&gt;&lt;/li&gt;
+      &lt;/ul&gt;
+    &lt;/td&gt;
+    
+  &lt;/tbody&gt;
+&lt;/table&gt;
+
+&lt;hr /&gt;
+
+&lt;p&gt;If you’d like to keep a closer eye on what’s happening in the community, subscribe to the Flink &lt;a href=&quot;https://flink.apache.org/community.html#mailing-lists&quot;&gt;@community mailing list&lt;/a&gt; to get fine-grained weekly updates, upcoming event announcements and more.&lt;/p&gt;
+</description>
+<pubDate>Mon, 27 Jul 2020 10:00:00 +0200</pubDate>
+<link>https://flink.apache.org/news/2020/07/27/community-update.html</link>
+<guid isPermaLink="true">/news/2020/07/27/community-update.html</guid>
+</item>
+
+<item>
 <title>Sharing is caring - Catalogs in Flink SQL</title>
 <description>&lt;p&gt;With an ever-growing number of people working with data, it’s a common practice for companies to build self-service platforms with the goal of democratizing their access across different teams and — especially — to enable users from any background to be independent in their data needs. In such environments, metadata management becomes a crucial aspect. Without it, users often work blindly, spending too much time searching for datasets and their location, figuring ou [...]
 
@@ -16320,221 +17219,5 @@ While you can embed Spouts/Bolts in a Flink program and mix-and-match them with
 <guid isPermaLink="true">/news/2015/12/11/storm-compatibility.html</guid>
 </item>
 
-<item>
-<title>Introducing Stream Windows in Apache Flink</title>
-<description>&lt;p&gt;The data analysis space is witnessing an evolution from batch to stream processing for many use cases. Although batch can be handled as a special case of stream processing, analyzing never-ending streaming data often requires a shift in the mindset and comes with its own terminology (for example, “windowing” and “at-least-once”/”exactly-once” processing). This shift and the new terminology can be quite confusing for people being new to the space of stream processing [...]
-
-&lt;p&gt;In this blog post, we discuss the concept of windows for stream processing, present Flink’s built-in windows, and explain its support for custom windowing semantics.&lt;/p&gt;
-
-&lt;h2 id=&quot;what-are-windows-and-what-are-they-good-for&quot;&gt;What are windows and what are they good for?&lt;/h2&gt;
-
-&lt;p&gt;Consider the example of a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location. The resulting stream could look like:&lt;/p&gt;
-
-&lt;center&gt;
-&lt;img src=&quot;/img/blog/window-intro/window-stream.png&quot; style=&quot;width:75%;margin:15px&quot; /&gt;
-&lt;/center&gt;
-
-&lt;p&gt;If you would like to know, how many vehicles passed that location, you would simply sum the individual counts. However, the nature of a sensor stream is that it continuously produces data. Such a stream never ends and it is not possible to compute a final sum that can be returned. Instead, it is possible to compute rolling sums, i.e., return for each input event an updated sum record. This would yield a new stream of partial sums.&lt;/p&gt;
-
-&lt;center&gt;
-&lt;img src=&quot;/img/blog/window-intro/window-rolling-sum.png&quot; style=&quot;width:75%;margin:15px&quot; /&gt;
-&lt;/center&gt;
-
-&lt;p&gt;However, a stream of partial sums might not be what we are looking for, because it constantly updates the count and even more important, some information such as variation over time is lost. Hence, we might want to rephrase our question and ask for the number of cars that pass the location every minute. This requires us to group the elements of the stream into finite sets, each set corresponding to sixty seconds. This operation is called a &lt;em&gt;tumbling windows&lt;/em&gt; o [...]
-
-&lt;center&gt;
-&lt;img src=&quot;/img/blog/window-intro/window-tumbling-window.png&quot; style=&quot;width:75%;margin:15px&quot; /&gt;
-&lt;/center&gt;
-
-&lt;p&gt;Tumbling windows discretize a stream into non-overlapping windows. For certain applications it is important that windows are not disjunct because an application might require smoothed aggregates. For example, we can compute every thirty seconds the number of cars passed in the last minute. Such windows are called &lt;em&gt;sliding windows&lt;/em&gt;.&lt;/p&gt;
-
-&lt;center&gt;
-&lt;img src=&quot;/img/blog/window-intro/window-sliding-window.png&quot; style=&quot;width:75%;margin:15px&quot; /&gt;
-&lt;/center&gt;
-
-&lt;p&gt;Defining windows on a data stream as discussed before is a non-parallel operation. This is because each element of a stream must be processed by the same window operator that decides which windows the element should be added to. Windows on a full stream are called &lt;em&gt;AllWindows&lt;/em&gt; in Flink. For many applications, a data stream needs to be grouped into multiple logical streams on each of which a window operator can be applied. Think for example about a stream of ve [...]
-
-&lt;center&gt;
-&lt;img src=&quot;/img/blog/window-intro/windows-keyed.png&quot; style=&quot;width:75%;margin:15px&quot; /&gt;
-&lt;/center&gt;
-
-&lt;p&gt;Generally speaking, a window defines a finite set of elements on an unbounded stream. This set can be based on time (as in our previous examples), element counts, a combination of counts and time, or some custom logic to assign elements to windows. Flink’s DataStream API provides concise operators for the most common window operations as well as a generic windowing mechanism that allows users to define very custom windowing logic. In the following we present Flink’s time and cou [...]
-
-&lt;h2 id=&quot;time-windows&quot;&gt;Time Windows&lt;/h2&gt;
-
-&lt;p&gt;As their name suggests, time windows group stream elements by time. For example, a tumbling time window of one minute collects elements for one minute and applies a function on all elements in the window after one minute passed.&lt;/p&gt;
-
-&lt;p&gt;Defining tumbling and sliding time windows in Apache Flink is very easy:&lt;/p&gt;
-
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-scala&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// Stream of (sensorId, carCnt)&lt;/span&gt;
-&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;vehicleCnts&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;DataStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[(&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;...&lt;/span&gt;
-
-&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;tumblingCnts&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;DataStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[(&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;vehicleCn [...]
-  &lt;span class=&quot;c1&quot;&gt;// key stream by sensorId&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;keyBy&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; 
-  &lt;span class=&quot;c1&quot;&gt;// tumbling time window of 1 minute length&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;timeWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;nc&quot;&gt;Time&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;minutes&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;))&lt;/span&gt;
-  &lt;span class=&quot;c1&quot;&gt;// compute sum over carCnt&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;sum&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; 
-
-&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;slidingCnts&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;DataStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[(&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;vehicleCnt [...]
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;keyBy&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; 
-  &lt;span class=&quot;c1&quot;&gt;// sliding time window of 1 minute length and 30 secs trigger interval&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;timeWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;nc&quot;&gt;Time&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;minutes&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;),&lt;/span&gt; &lt;span class=&quot;nc&quot;&gt;Time&lt;/span&gt;&lt; [...]
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;sum&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
-
-&lt;p&gt;There is one aspect that we haven’t discussed yet, namely the exact meaning of “&lt;em&gt;collects elements for one minute&lt;/em&gt;” which boils down to the question, “&lt;em&gt;How does the stream processor interpret time?&lt;/em&gt;”.&lt;/p&gt;
-
-&lt;p&gt;Apache Flink features three different notions of time, namely &lt;em&gt;processing time&lt;/em&gt;, &lt;em&gt;event time&lt;/em&gt;, and &lt;em&gt;ingestion time&lt;/em&gt;.&lt;/p&gt;
-
-&lt;ol&gt;
-  &lt;li&gt;In &lt;strong&gt;processing time&lt;/strong&gt;, windows are defined with respect to the wall clock of the machine that builds and processes a window, i.e., a one minute processing time window collects elements for exactly one minute.&lt;/li&gt;
-  &lt;li&gt;In &lt;strong&gt;event time&lt;/strong&gt;, windows are defined with respect to timestamps that are attached to each event record. This is common for many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics from the actual serving speed of the source and the processing performance of system. Hen [...]
-  &lt;li&gt;&lt;strong&gt;Ingestion time&lt;/strong&gt; is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive in the system (at the source) and continues processing with event time semantics based on the attached timestamps.&lt;/li&gt;
-&lt;/ol&gt;
-
-&lt;h2 id=&quot;count-windows&quot;&gt;Count Windows&lt;/h2&gt;
-
-&lt;p&gt;Apache Flink also features count windows. A tumbling count window of 100 will collect 100 events in a window and evaluate the window when the 100th element has been added.&lt;/p&gt;
-
-&lt;p&gt;In Flink’s DataStream API, tumbling and sliding count windows are defined as follows:&lt;/p&gt;
-
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-scala&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// Stream of (sensorId, carCnt)&lt;/span&gt;
-&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;vehicleCnts&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;DataStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[(&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;...&lt;/span&gt;
-
-&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;tumblingCnts&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;DataStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[(&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;vehicleCn [...]
-  &lt;span class=&quot;c1&quot;&gt;// key stream by sensorId&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;keyBy&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt;
-  &lt;span class=&quot;c1&quot;&gt;// tumbling count window of 100 elements size&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;countWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;100&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt;
-  &lt;span class=&quot;c1&quot;&gt;// compute the carCnt sum &lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;sum&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt;
-
-&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;slidingCnts&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;DataStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[(&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;Int&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;vehicleCnt [...]
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;keyBy&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt;
-  &lt;span class=&quot;c1&quot;&gt;// sliding count window of 100 elements size and 10 elements trigger interval&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;countWindow&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;100&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;sum&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
-
-&lt;h2 id=&quot;dissecting-flinks-windowing-mechanics&quot;&gt;Dissecting Flink’s windowing mechanics&lt;/h2&gt;
-
-&lt;p&gt;Flink’s built-in time and count windows cover a wide range of common window use cases. However, there are of course applications that require custom windowing logic that cannot be addressed by Flink’s built-in windows. In order to support also applications that need very specific windowing semantics, the DataStream API exposes interfaces for the internals of its windowing mechanics. These interfaces give very fine-grained control about the way that windows are built and evaluate [...]
-
-&lt;p&gt;The following figure depicts Flink’s windowing mechanism and introduces the components being involved.&lt;/p&gt;
-
-&lt;center&gt;
-&lt;img src=&quot;/img/blog/window-intro/window-mechanics.png&quot; style=&quot;width:90%;margin:15px&quot; /&gt;
-&lt;/center&gt;
-
-&lt;p&gt;Elements that arrive at a window operator are handed to a &lt;code&gt;WindowAssigner&lt;/code&gt;. The WindowAssigner assigns elements to one or more windows, possibly creating new windows. A &lt;code&gt;Window&lt;/code&gt; itself is just an identifier for a list of elements and may provide some optional meta information, such as begin and end time in case of a &lt;code&gt;TimeWindow&lt;/code&gt;. Note that an element can be added to multiple windows, which also means that multi [...]
-
-&lt;p&gt;Each window owns a &lt;code&gt;Trigger&lt;/code&gt; that decides when the window is evaluated or purged. The trigger is called for each element that is inserted into the window and when a previously registered timer times out. On each event, a trigger can decide to fire (i.e., evaluate), purge (remove the window and discard its content), or fire and then purge the window. A trigger that just fires evaluates the window and keeps it as it is, i.e., all elements remain in the windo [...]
-
-&lt;p&gt;When a Trigger fires, the list of window elements can be given to an optional &lt;code&gt;Evictor&lt;/code&gt;. The evictor can iterate through the list and decide to cut off some elements from the start of the list, i.e., remove some of the elements that entered the window first. The remaining elements are given to an evaluation function. If no Evictor was defined, the Trigger hands all the window elements directly to the evaluation function.&lt;/p&gt;
-
-&lt;p&gt;The evaluation function receives the elements of a window (possibly filtered by an Evictor) and computes one or more result elements for the window. The DataStream API accepts different types of evaluation functions, including predefined aggregation functions such as &lt;code&gt;sum()&lt;/code&gt;, &lt;code&gt;min()&lt;/code&gt;, &lt;code&gt;max()&lt;/code&gt;, as well as a &lt;code&gt;ReduceFunction&lt;/code&gt;, &lt;code&gt;FoldFunction&lt;/code&gt;, or &lt;code&gt;WindowFunct [...]
-
-&lt;p&gt;These are the components that constitute Flink’s windowing mechanics. We now show step-by-step how to implement custom windowing logic with the DataStream API. We start with a stream of type &lt;code&gt;DataStream[IN]&lt;/code&gt; and key it using a key selector function that extracts a key of type &lt;code&gt;KEY&lt;/code&gt; to obtain a &lt;code&gt;KeyedStream[IN, KEY]&lt;/code&gt;.&lt;/p&gt;
-
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-scala&quot;&gt;&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;input&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;DataStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;IN&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;spa [...]
-
-&lt;span class=&quot;c1&quot;&gt;// created a keyed stream using a key selector function&lt;/span&gt;
-&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;keyed&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;KeyedStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;IN&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;KEY&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;input&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;keyBy&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;myKeySel&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;IN&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;=&amp;gt;&lt;/span&gt; &lt;span class=&quot;nc&quot;&gt;KEY&lt;/span&gt;&lt [...]
-
-&lt;p&gt;We apply a &lt;code&gt;WindowAssigner[IN, WINDOW]&lt;/code&gt; that creates windows of type &lt;code&gt;WINDOW&lt;/code&gt; resulting in a &lt;code&gt;WindowedStream[IN, KEY, WINDOW]&lt;/code&gt;. In addition, a &lt;code&gt;WindowAssigner&lt;/code&gt; also provides a default &lt;code&gt;Trigger&lt;/code&gt; implementation.&lt;/p&gt;
-
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-scala&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// create windowed stream using a WindowAssigner&lt;/span&gt;
-&lt;span class=&quot;k&quot;&gt;var&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowed&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;WindowedStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;IN&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;KEY&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;WINDOW&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt; [...]
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;window&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;myAssigner&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;WindowAssigner&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;IN&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;WINDOW&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;])&lt [...]
-
-&lt;p&gt;We can explicitly specify a &lt;code&gt;Trigger&lt;/code&gt; to overwrite the default &lt;code&gt;Trigger&lt;/code&gt; provided by the &lt;code&gt;WindowAssigner&lt;/code&gt;. Note that specifying a triggers does not add an additional trigger condition but replaces the current trigger.&lt;/p&gt;
-
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-scala&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// override the default trigger of the WindowAssigner&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;windowed&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowed&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;trigger&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;myTrigger&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;Trigger&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;IN&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;WINDOW&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;])&lt;/span& [...]
-
-&lt;p&gt;We may want to specify an optional &lt;code&gt;Evictor&lt;/code&gt; as follows.&lt;/p&gt;
-
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-scala&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// specify an optional evictor&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;windowed&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowed&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;evictor&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;myEvictor&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;Evictor&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;IN&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;WINDOW&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;])&lt;/span& [...]
-
-&lt;p&gt;Finally, we apply a &lt;code&gt;WindowFunction&lt;/code&gt; that returns elements of type &lt;code&gt;OUT&lt;/code&gt; to obtain a &lt;code&gt;DataStream[OUT]&lt;/code&gt;.&lt;/p&gt;
-
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-scala&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// apply window function to windowed stream&lt;/span&gt;
-&lt;span class=&quot;k&quot;&gt;val&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;output&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;DataStream&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;OUT&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;]&lt;/span&gt; &lt;span class=&quot;k&quot;&gt;=&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;windowed&lt;/span&gt;
-  &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;apply&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;n&quot;&gt;myWinFunc&lt;/span&gt;&lt;span class=&quot;k&quot;&gt;:&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;WindowFunction&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span class=&quot;kt&quot;&gt;IN&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;OUT&lt;/span&gt;, &lt;span class=&quot;kt&quot;&gt;KEY&lt; [...]
-
-&lt;p&gt;With Flink’s internal windowing mechanics and its exposure through the DataStream API it is possible to implement very custom windowing logic such as session windows or windows that emit early results if the values exceed a certain threshold.&lt;/p&gt;
-
-&lt;h2 id=&quot;conclusion&quot;&gt;Conclusion&lt;/h2&gt;
-
-&lt;p&gt;Support for various types of windows over continuous data streams is a must-have for modern stream processors. Apache Flink is a stream processor with a very strong feature set, including a very flexible mechanism to build and evaluate windows over continuous data streams. Flink provides pre-defined window operators for common uses cases as well as a toolbox that allows to define very custom windowing logic. The Flink community will add more pre-defined window operators as we le [...]
-</description>
-<pubDate>Fri, 04 Dec 2015 11:00:00 +0100</pubDate>
-<link>https://flink.apache.org/news/2015/12/04/Introducing-windows.html</link>
-<guid isPermaLink="true">/news/2015/12/04/Introducing-windows.html</guid>
-</item>
-
-<item>
-<title>Flink 0.10.1 released</title>
-<description>&lt;p&gt;Today, the Flink community released the first bugfix release of the 0.10 series of Flink.&lt;/p&gt;
-
-&lt;p&gt;We recommend all users updating to this release, by bumping the version of your Flink dependencies and updating the binaries on the server.&lt;/p&gt;
-
-&lt;h2 id=&quot;issues-fixed&quot;&gt;Issues fixed&lt;/h2&gt;
-
-&lt;ul class=&quot;list-unstyled&quot;&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2879&quot;&gt;FLINK-2879&lt;/a&gt;] -         Links in documentation are broken
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2938&quot;&gt;FLINK-2938&lt;/a&gt;] -         Streaming docs not in sync with latest state changes
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2942&quot;&gt;FLINK-2942&lt;/a&gt;] -         Dangling operators in web UI&amp;#39;s program visualization (non-deterministic)
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2967&quot;&gt;FLINK-2967&lt;/a&gt;] -         TM address detection might not always detect the right interface on slow networks / overloaded JMs
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2977&quot;&gt;FLINK-2977&lt;/a&gt;] -         Cannot access HBase in a Kerberos secured Yarn cluster
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2987&quot;&gt;FLINK-2987&lt;/a&gt;] -         Flink 0.10 fails to start on YARN 2.6.0
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2989&quot;&gt;FLINK-2989&lt;/a&gt;] -         Job Cancel button doesn&amp;#39;t work on Yarn
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3005&quot;&gt;FLINK-3005&lt;/a&gt;] -         Commons-collections object deserialization remote command execution vulnerability
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3011&quot;&gt;FLINK-3011&lt;/a&gt;] -         Cannot cancel failing/restarting streaming job from the command line
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3019&quot;&gt;FLINK-3019&lt;/a&gt;] -         CLI does not list running/restarting jobs
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3020&quot;&gt;FLINK-3020&lt;/a&gt;] -         Local streaming execution: set number of task manager slots to the maximum parallelism
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3024&quot;&gt;FLINK-3024&lt;/a&gt;] -         TimestampExtractor Does not Work When returning Long.MIN_VALUE
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3032&quot;&gt;FLINK-3032&lt;/a&gt;] -         Flink does not start on Hadoop 2.7.1 (HDP), due to class conflict
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3043&quot;&gt;FLINK-3043&lt;/a&gt;] -         Kafka Connector description in Streaming API guide is wrong/outdated
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3047&quot;&gt;FLINK-3047&lt;/a&gt;] -         Local batch execution: set number of task manager slots to the maximum parallelism
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3052&quot;&gt;FLINK-3052&lt;/a&gt;] -         Optimizer does not push properties out of bulk iterations
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2966&quot;&gt;FLINK-2966&lt;/a&gt;] -         Improve the way job duration is reported on web frontend.
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-2974&quot;&gt;FLINK-2974&lt;/a&gt;] -         Add periodic offset commit to Kafka Consumer if checkpointing is disabled
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3028&quot;&gt;FLINK-3028&lt;/a&gt;] -         Cannot cancel restarting job via web frontend
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3040&quot;&gt;FLINK-3040&lt;/a&gt;] -         Add docs describing how to configure State Backends
-&lt;/li&gt;
-&lt;li&gt;[&lt;a href=&quot;https://issues.apache.org/jira/browse/FLINK-3041&quot;&gt;FLINK-3041&lt;/a&gt;] -         Twitter Streaming Description section of Streaming Programming guide refers to an incorrect example &amp;#39;TwitterLocal&amp;#39;
-&lt;/li&gt;
-&lt;/ul&gt;
-
-</description>
-<pubDate>Fri, 27 Nov 2015 09:00:00 +0100</pubDate>
-<link>https://flink.apache.org/news/2015/11/27/release-0.10.1.html</link>
-<guid isPermaLink="true">/news/2015/11/27/release-0.10.1.html</guid>
-</item>
-
 </channel>
 </rss>
diff --git a/content/blog/index.html b/content/blog/index.html
index 10fc2aa..009ad78 100644
--- a/content/blog/index.html
+++ b/content/blog/index.html
@@ -196,6 +196,19 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></h2>
+
+      <p>30 Jul 2020
+       Alexander Fedulov (<a href="https://twitter.com/alex_fedulov">@alex_fedulov</a>)</p>
+
+      <p>In this series of blog posts you will learn about powerful Flink patterns for building streaming applications.</p>
+
+      <p><a href="/news/2020/07/30/demo-fraud-detection-3.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></h2>
 
       <p>28 Jul 2020
@@ -330,21 +343,6 @@ and provide a tutorial for running Streaming ETL with Flink on Zeppelin.</p>
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2020/06/09/release-statefun-2.1.0.html">Stateful Functions 2.1.0 Release Announcement</a></h2>
-
-      <p>09 Jun 2020
-       Marta Paes (<a href="https://twitter.com/morsapaes">@morsapaes</a>)</p>
-
-      <p><p>The Apache Flink community is happy to announce the release of Stateful Functions (StateFun) 2.1.0! This release introduces new features around state expiration and performance improvements for co-located deployments, as well as other important changes that improve the stability and testability of the project. As the community around StateFun grows, the release cycle will follow this pattern of smaller and more frequent releases to incorporate user feedback and allow for fast [...]
-
-</p>
-
-      <p><a href="/news/2020/06/09/release-statefun-2.1.0.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -377,6 +375,16 @@ and provide a tutorial for running Streaming ETL with Flink on Zeppelin.</p>
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page10/index.html b/content/blog/page10/index.html
index b6ab567..676b25e 100644
--- a/content/blog/page10/index.html
+++ b/content/blog/page10/index.html
@@ -196,6 +196,20 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2016/05/24/stream-sql.html">Stream Processing for Everyone with SQL and Apache Flink</a></h2>
+
+      <p>24 May 2016 by Fabian Hueske (<a href="https://twitter.com/">@fhueske</a>)
+      </p>
+
+      <p><p>About six months ago, the Apache Flink community started an effort to add a SQL interface for stream data analysis. SQL is <i>the</i> standard language to access and process data. Everybody who occasionally analyzes data is familiar with SQL. Consequently, a SQL interface for stream data processing will make this technology accessible to a much wider audience. Moreover, SQL support for streaming data will also enable new use cases such as interactive and ad-hoc stream analysi [...]
+<p>In this blog post, we report on the current status, architectural design, and future plans of the Apache Flink community to implement support for SQL as a language for analyzing data streams.</p></p>
+
+      <p><a href="/news/2016/05/24/stream-sql.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2016/05/11/release-1.0.3.html">Flink 1.0.3 Released</a></h2>
 
       <p>11 May 2016
@@ -324,20 +338,6 @@
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2015/12/04/Introducing-windows.html">Introducing Stream Windows in Apache Flink</a></h2>
-
-      <p>04 Dec 2015 by Fabian Hueske (<a href="https://twitter.com/">@fhueske</a>)
-      </p>
-
-      <p><p>The data analysis space is witnessing an evolution from batch to stream processing for many use cases. Although batch can be handled as a special case of stream processing, analyzing never-ending streaming data often requires a shift in the mindset and comes with its own terminology (for example, “windowing” and “at-least-once”/”exactly-once” processing). This shift and the new terminology can be quite confusing for people being new to the space of stream processing. Apache F [...]
-<p>In this blog post, we discuss the concept of windows for stream processing, present Flink's built-in windows, and explain its support for custom windowing semantics.</p></p>
-
-      <p><a href="/news/2015/12/04/Introducing-windows.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -370,6 +370,16 @@
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page11/index.html b/content/blog/page11/index.html
index 1ebdfd5..9f40763 100644
--- a/content/blog/page11/index.html
+++ b/content/blog/page11/index.html
@@ -196,6 +196,20 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2015/12/04/Introducing-windows.html">Introducing Stream Windows in Apache Flink</a></h2>
+
+      <p>04 Dec 2015 by Fabian Hueske (<a href="https://twitter.com/">@fhueske</a>)
+      </p>
+
+      <p><p>The data analysis space is witnessing an evolution from batch to stream processing for many use cases. Although batch can be handled as a special case of stream processing, analyzing never-ending streaming data often requires a shift in the mindset and comes with its own terminology (for example, “windowing” and “at-least-once”/”exactly-once” processing). This shift and the new terminology can be quite confusing for people being new to the space of stream processing. Apache F [...]
+<p>In this blog post, we discuss the concept of windows for stream processing, present Flink's built-in windows, and explain its support for custom windowing semantics.</p></p>
+
+      <p><a href="/news/2015/12/04/Introducing-windows.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2015/11/27/release-0.10.1.html">Flink 0.10.1 released</a></h2>
 
       <p>27 Nov 2015
@@ -333,26 +347,6 @@ vertex-centric or gather-sum-apply to Flink dataflows.</p>
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2015/04/13/release-0.9.0-milestone1.html">Announcing Flink 0.9.0-milestone1 preview release</a></h2>
-
-      <p>13 Apr 2015
-      </p>
-
-      <p><p>The Apache Flink community is pleased to announce the availability of
-the 0.9.0-milestone-1 release. The release is a preview of the
-upcoming 0.9.0 release. It contains many new features which will be
-available in the upcoming 0.9 release. Interested users are encouraged
-to try it out and give feedback. As the version number indicates, this
-release is a preview release that contains known issues.</p>
-
-</p>
-
-      <p><a href="/news/2015/04/13/release-0.9.0-milestone1.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -385,6 +379,16 @@ release is a preview release that contains known issues.</p>
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page12/index.html b/content/blog/page12/index.html
index 6237f5c..ba6dd47 100644
--- a/content/blog/page12/index.html
+++ b/content/blog/page12/index.html
@@ -196,6 +196,26 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2015/04/13/release-0.9.0-milestone1.html">Announcing Flink 0.9.0-milestone1 preview release</a></h2>
+
+      <p>13 Apr 2015
+      </p>
+
+      <p><p>The Apache Flink community is pleased to announce the availability of
+the 0.9.0-milestone-1 release. The release is a preview of the
+upcoming 0.9.0 release. It contains many new features which will be
+available in the upcoming 0.9 release. Interested users are encouraged
+to try it out and give feedback. As the version number indicates, this
+release is a preview release that contains known issues.</p>
+
+</p>
+
+      <p><a href="/news/2015/04/13/release-0.9.0-milestone1.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2015/04/07/march-in-flink.html">March 2015 in the Flink community</a></h2>
 
       <p>07 Apr 2015
@@ -335,21 +355,6 @@ and offers a new API including definition of flexible windows.</p>
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2014/10/03/upcoming_events.html">Upcoming Events</a></h2>
-
-      <p>03 Oct 2014
-      </p>
-
-      <p><p>We are happy to announce several upcoming Flink events both in Europe and the US. Starting with a <strong>Flink hackathon in Stockholm</strong> (Oct 8-9) and a talk about Flink at the <strong>Stockholm Hadoop User Group</strong> (Oct 8). This is followed by the very first <strong>Flink Meetup in Berlin</strong> (Oct 15). In the US, there will be two Flink Meetup talks: the first one at the <strong>Pasadena Big Data User Group</strong> (Oct 29) and the second one at <strong>Si [...]
-
-</p>
-
-      <p><a href="/news/2014/10/03/upcoming_events.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -382,6 +387,16 @@ and offers a new API including definition of flexible windows.</p>
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page13/index.html b/content/blog/page13/index.html
index 642ef2f..3da2d59 100644
--- a/content/blog/page13/index.html
+++ b/content/blog/page13/index.html
@@ -196,6 +196,21 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2014/10/03/upcoming_events.html">Upcoming Events</a></h2>
+
+      <p>03 Oct 2014
+      </p>
+
+      <p><p>We are happy to announce several upcoming Flink events both in Europe and the US. Starting with a <strong>Flink hackathon in Stockholm</strong> (Oct 8-9) and a talk about Flink at the <strong>Stockholm Hadoop User Group</strong> (Oct 8). This is followed by the very first <strong>Flink Meetup in Berlin</strong> (Oct 15). In the US, there will be two Flink Meetup talks: the first one at the <strong>Pasadena Big Data User Group</strong> (Oct 29) and the second one at <strong>Si [...]
+
+</p>
+
+      <p><a href="/news/2014/10/03/upcoming_events.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2014/09/26/release-0.6.1.html">Apache Flink 0.6.1 available</a></h2>
 
       <p>26 Sep 2014
@@ -260,6 +275,16 @@ academic and open source project that Flink originates from.</p>
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page2/index.html b/content/blog/page2/index.html
index 4cd9fed..59e48c1 100644
--- a/content/blog/page2/index.html
+++ b/content/blog/page2/index.html
@@ -196,6 +196,21 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2020/06/09/release-statefun-2.1.0.html">Stateful Functions 2.1.0 Release Announcement</a></h2>
+
+      <p>09 Jun 2020
+       Marta Paes (<a href="https://twitter.com/morsapaes">@morsapaes</a>)</p>
+
+      <p><p>The Apache Flink community is happy to announce the release of Stateful Functions (StateFun) 2.1.0! This release introduces new features around state expiration and performance improvements for co-located deployments, as well as other important changes that improve the stability and testability of the project. As the community around StateFun grows, the release cycle will follow this pattern of smaller and more frequent releases to incorporate user feedback and allow for fast [...]
+
+</p>
+
+      <p><a href="/news/2020/06/09/release-statefun-2.1.0.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2020/05/12/release-1.10.1.html">Apache Flink 1.10.1 Released</a></h2>
 
       <p>12 May 2020
@@ -319,21 +334,6 @@ This release marks a big milestone: Stateful Functions 2.0 is not only an API up
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/features/2020/03/27/flink-for-data-warehouse.html">Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration</a></h2>
-
-      <p>27 Mar 2020
-       Bowen Li (<a href="https://twitter.com/Bowen__Li">@Bowen__Li</a>)</p>
-
-      <p><p>In this blog post, you will learn our motivation behind the Flink-Hive integration, and how Flink 1.10 can help modernize your data warehouse.</p>
-
-</p>
-
-      <p><a href="/features/2020/03/27/flink-for-data-warehouse.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -366,6 +366,16 @@ This release marks a big milestone: Stateful Functions 2.0 is not only an API up
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page3/index.html b/content/blog/page3/index.html
index 3e2ec2e..3f87858 100644
--- a/content/blog/page3/index.html
+++ b/content/blog/page3/index.html
@@ -196,6 +196,21 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/features/2020/03/27/flink-for-data-warehouse.html">Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration</a></h2>
+
+      <p>27 Mar 2020
+       Bowen Li (<a href="https://twitter.com/Bowen__Li">@Bowen__Li</a>)</p>
+
+      <p><p>In this blog post, you will learn our motivation behind the Flink-Hive integration, and how Flink 1.10 can help modernize your data warehouse.</p>
+
+</p>
+
+      <p><a href="/features/2020/03/27/flink-for-data-warehouse.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2020/03/24/demo-fraud-detection-2.html">Advanced Flink Application Patterns Vol.2: Dynamic Updates of Application Logic</a></h2>
 
       <p>24 Mar 2020
@@ -318,19 +333,6 @@
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2019/12/09/flink-kubernetes-kudo.html">Running Apache Flink on Kubernetes with KUDO</a></h2>
-
-      <p>09 Dec 2019
-       Gerred Dillon </p>
-
-      <p>A common use case for Apache Flink is streaming data analytics together with Apache Kafka, which provides a pub/sub model and durability for data streams. In this post, we demonstrate how to orchestrate Flink and Kafka with KUDO.</p>
-
-      <p><a href="/news/2019/12/09/flink-kubernetes-kudo.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -363,6 +365,16 @@
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page4/index.html b/content/blog/page4/index.html
index e8d0b7f..e9e5df6 100644
--- a/content/blog/page4/index.html
+++ b/content/blog/page4/index.html
@@ -196,6 +196,19 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2019/12/09/flink-kubernetes-kudo.html">Running Apache Flink on Kubernetes with KUDO</a></h2>
+
+      <p>09 Dec 2019
+       Gerred Dillon </p>
+
+      <p>A common use case for Apache Flink is streaming data analytics together with Apache Kafka, which provides a pub/sub model and durability for data streams. In this post, we demonstrate how to orchestrate Flink and Kafka with KUDO.</p>
+
+      <p><a href="/news/2019/12/09/flink-kubernetes-kudo.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2019/11/25/query-pulsar-streams-using-apache-flink.html">How to query Pulsar Streams using Apache Flink</a></h2>
 
       <p>25 Nov 2019
@@ -321,19 +334,6 @@
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/2019/06/05/flink-network-stack.html">A Deep-Dive into Flink's Network Stack</a></h2>
-
-      <p>05 Jun 2019
-       Nico Kruber </p>
-
-      <p>Flink’s network stack is one of the core components that make up Apache Flink's runtime module sitting at the core of every Flink job. In this post, which is the first in a series of posts about the network stack, we look at the abstractions exposed to the stream operators and detail their physical implementation and various optimisations in Apache Flink.</p>
-
-      <p><a href="/2019/06/05/flink-network-stack.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -366,6 +366,16 @@
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page5/index.html b/content/blog/page5/index.html
index 10506ba..dac3fb9 100644
--- a/content/blog/page5/index.html
+++ b/content/blog/page5/index.html
@@ -196,6 +196,19 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/2019/06/05/flink-network-stack.html">A Deep-Dive into Flink's Network Stack</a></h2>
+
+      <p>05 Jun 2019
+       Nico Kruber </p>
+
+      <p>Flink’s network stack is one of the core components that make up Apache Flink's runtime module sitting at the core of every Flink job. In this post, which is the first in a series of posts about the network stack, we look at the abstractions exposed to the stream operators and detail their physical implementation and various optimisations in Apache Flink.</p>
+
+      <p><a href="/2019/06/05/flink-network-stack.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/2019/05/19/state-ttl.html">State TTL in Flink 1.8.0: How to Automatically Cleanup Application State in Apache Flink</a></h2>
 
       <p>19 May 2019
@@ -322,21 +335,6 @@ for more details.</p>
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2019/02/15/release-1.7.2.html">Apache Flink 1.7.2 Released</a></h2>
-
-      <p>15 Feb 2019
-      </p>
-
-      <p><p>The Apache Flink community released the second bugfix version of the Apache Flink 1.7 series.</p>
-
-</p>
-
-      <p><a href="/news/2019/02/15/release-1.7.2.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -369,6 +367,16 @@ for more details.</p>
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page6/index.html b/content/blog/page6/index.html
index 1a3ead7..4bfda30 100644
--- a/content/blog/page6/index.html
+++ b/content/blog/page6/index.html
@@ -196,6 +196,21 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2019/02/15/release-1.7.2.html">Apache Flink 1.7.2 Released</a></h2>
+
+      <p>15 Feb 2019
+      </p>
+
+      <p><p>The Apache Flink community released the second bugfix version of the Apache Flink 1.7 series.</p>
+
+</p>
+
+      <p><a href="/news/2019/02/15/release-1.7.2.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2019/02/13/unified-batch-streaming-blink.html">Batch as a Special Case of Streaming and Alibaba's contribution of Blink</a></h2>
 
       <p>13 Feb 2019
@@ -330,21 +345,6 @@ Please check the <a href="https://issues.apache.org/jira/secure/ReleaseNote.jspa
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2018/08/21/release-1.5.3.html">Apache Flink 1.5.3 Released</a></h2>
-
-      <p>21 Aug 2018
-      </p>
-
-      <p><p>The Apache Flink community released the third bugfix version of the Apache Flink 1.5 series.</p>
-
-</p>
-
-      <p><a href="/news/2018/08/21/release-1.5.3.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -377,6 +377,16 @@ Please check the <a href="https://issues.apache.org/jira/secure/ReleaseNote.jspa
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page7/index.html b/content/blog/page7/index.html
index c5d201d..7fb0917 100644
--- a/content/blog/page7/index.html
+++ b/content/blog/page7/index.html
@@ -196,6 +196,21 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2018/08/21/release-1.5.3.html">Apache Flink 1.5.3 Released</a></h2>
+
+      <p>21 Aug 2018
+      </p>
+
+      <p><p>The Apache Flink community released the third bugfix version of the Apache Flink 1.5 series.</p>
+
+</p>
+
+      <p><a href="/news/2018/08/21/release-1.5.3.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2018/08/09/release-1.6.0.html">Apache Flink 1.6.0 Release Announcement</a></h2>
 
       <p>09 Aug 2018
@@ -326,19 +341,6 @@
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2017/12/21/2017-year-in-review.html">Apache Flink in 2017: Year in Review</a></h2>
-
-      <p>21 Dec 2017
-       Chris Ward (<a href="https://twitter.com/chrischinch">@chrischinch</a>) &amp; Mike Winters (<a href="https://twitter.com/wints">@wints</a>)</p>
-
-      <p>As 2017 comes to a close, let's take a moment to look back on the Flink community's great work during the past year.</p>
-
-      <p><a href="/news/2017/12/21/2017-year-in-review.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -371,6 +373,16 @@
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page8/index.html b/content/blog/page8/index.html
index 7055ac8..4151269 100644
--- a/content/blog/page8/index.html
+++ b/content/blog/page8/index.html
@@ -196,6 +196,19 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2017/12/21/2017-year-in-review.html">Apache Flink in 2017: Year in Review</a></h2>
+
+      <p>21 Dec 2017
+       Chris Ward (<a href="https://twitter.com/chrischinch">@chrischinch</a>) &amp; Mike Winters (<a href="https://twitter.com/wints">@wints</a>)</p>
+
+      <p>As 2017 comes to a close, let's take a moment to look back on the Flink community's great work during the past year.</p>
+
+      <p><a href="/news/2017/12/21/2017-year-in-review.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2017/12/12/release-1.4.0.html">Apache Flink 1.4.0 Release Announcement</a></h2>
 
       <p>12 Dec 2017
@@ -332,19 +345,6 @@ what’s coming in Flink 1.4.0 as well as a preview of what the Flink community
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2017/03/29/table-sql-api-update.html">From Streams to Tables and Back Again: An Update on Flink's Table & SQL API</a></h2>
-
-      <p>29 Mar 2017 by Timo Walther (<a href="https://twitter.com/">@twalthr</a>)
-      </p>
-
-      <p><p>Broadening the user base and unifying batch & streaming with relational APIs</p></p>
-
-      <p><a href="/news/2017/03/29/table-sql-api-update.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -377,6 +377,16 @@ what’s coming in Flink 1.4.0 as well as a preview of what the Flink community
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/blog/page9/index.html b/content/blog/page9/index.html
index 46a7ff3..e26b9e5 100644
--- a/content/blog/page9/index.html
+++ b/content/blog/page9/index.html
@@ -196,6 +196,19 @@
     <!-- Blog posts -->
     
     <article>
+      <h2 class="blog-title"><a href="/news/2017/03/29/table-sql-api-update.html">From Streams to Tables and Back Again: An Update on Flink's Table & SQL API</a></h2>
+
+      <p>29 Mar 2017 by Timo Walther (<a href="https://twitter.com/">@twalthr</a>)
+      </p>
+
+      <p><p>Broadening the user base and unifying batch & streaming with relational APIs</p></p>
+
+      <p><a href="/news/2017/03/29/table-sql-api-update.html">Continue reading &raquo;</a></p>
+    </article>
+
+    <hr>
+    
+    <article>
       <h2 class="blog-title"><a href="/news/2017/03/23/release-1.1.5.html">Apache Flink 1.1.5 Released</a></h2>
 
       <p>23 Mar 2017
@@ -326,20 +339,6 @@
 
     <hr>
     
-    <article>
-      <h2 class="blog-title"><a href="/news/2016/05/24/stream-sql.html">Stream Processing for Everyone with SQL and Apache Flink</a></h2>
-
-      <p>24 May 2016 by Fabian Hueske (<a href="https://twitter.com/">@fhueske</a>)
-      </p>
-
-      <p><p>About six months ago, the Apache Flink community started an effort to add a SQL interface for stream data analysis. SQL is <i>the</i> standard language to access and process data. Everybody who occasionally analyzes data is familiar with SQL. Consequently, a SQL interface for stream data processing will make this technology accessible to a much wider audience. Moreover, SQL support for streaming data will also enable new use cases such as interactive and ad-hoc stream analysi [...]
-<p>In this blog post, we report on the current status, architectural design, and future plans of the Apache Flink community to implement support for SQL as a language for analyzing data streams.</p></p>
-
-      <p><a href="/news/2016/05/24/stream-sql.html">Continue reading &raquo;</a></p>
-    </article>
-
-    <hr>
-    
 
     <!-- Pagination links -->
     
@@ -372,6 +371,16 @@
 
     <ul id="markdown-toc">
       
+      <li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></li>
+
+      
+        
+      
+    
+      
+      
+
+      
       <li><a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></li>
 
       
diff --git a/content/img/blog/patterns-blog-3/evaluation-delays.png b/content/img/blog/patterns-blog-3/evaluation-delays.png
new file mode 100644
index 0000000..2edf942
Binary files /dev/null and b/content/img/blog/patterns-blog-3/evaluation-delays.png differ
diff --git a/content/img/blog/patterns-blog-3/keyed-state-scoping.png b/content/img/blog/patterns-blog-3/keyed-state-scoping.png
new file mode 100644
index 0000000..fa6fcc3
Binary files /dev/null and b/content/img/blog/patterns-blog-3/keyed-state-scoping.png differ
diff --git a/content/img/blog/patterns-blog-3/late-events.png b/content/img/blog/patterns-blog-3/late-events.png
new file mode 100644
index 0000000..5bdfad1
Binary files /dev/null and b/content/img/blog/patterns-blog-3/late-events.png differ
diff --git a/content/img/blog/patterns-blog-3/pre-aggregation.png b/content/img/blog/patterns-blog-3/pre-aggregation.png
new file mode 100644
index 0000000..8005dc5
Binary files /dev/null and b/content/img/blog/patterns-blog-3/pre-aggregation.png differ
diff --git a/content/img/blog/patterns-blog-3/sample-rule-definition.png b/content/img/blog/patterns-blog-3/sample-rule-definition.png
new file mode 100644
index 0000000..1a625c9
Binary files /dev/null and b/content/img/blog/patterns-blog-3/sample-rule-definition.png differ
diff --git a/content/img/blog/patterns-blog-3/time-windows.png b/content/img/blog/patterns-blog-3/time-windows.png
new file mode 100644
index 0000000..5649e27
Binary files /dev/null and b/content/img/blog/patterns-blog-3/time-windows.png differ
diff --git a/content/img/blog/patterns-blog-3/type-kryo.png b/content/img/blog/patterns-blog-3/type-kryo.png
new file mode 100644
index 0000000..eaf7153
Binary files /dev/null and b/content/img/blog/patterns-blog-3/type-kryo.png differ
diff --git a/content/img/blog/patterns-blog-3/type-pojo.png b/content/img/blog/patterns-blog-3/type-pojo.png
new file mode 100644
index 0000000..1f8cef0
Binary files /dev/null and b/content/img/blog/patterns-blog-3/type-pojo.png differ
diff --git a/content/img/blog/patterns-blog-3/widest-window.png b/content/img/blog/patterns-blog-3/widest-window.png
new file mode 100644
index 0000000..0376d64
Binary files /dev/null and b/content/img/blog/patterns-blog-3/widest-window.png differ
diff --git a/content/img/blog/patterns-blog-3/window-clean-up.png b/content/img/blog/patterns-blog-3/window-clean-up.png
new file mode 100644
index 0000000..32b2bdc
Binary files /dev/null and b/content/img/blog/patterns-blog-3/window-clean-up.png differ
diff --git a/content/index.html b/content/index.html
index e24b1c7..f0a9067 100644
--- a/content/index.html
+++ b/content/index.html
@@ -568,6 +568,9 @@
 
   <dl>
       
+        <dt> <a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></dt>
+        <dd>In this series of blog posts you will learn about powerful Flink patterns for building streaming applications.</dd>
+      
         <dt> <a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></dt>
         <dd>Apache Flink 1.11 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.</dd>
       
@@ -583,16 +586,6 @@
         <dd><p>The Apache Flink community released the first bugfix version of the Apache Flink 1.11 series.</p>
 
 </dd>
-      
-        <dt> <a href="/news/2020/07/14/application-mode.html">Application Deployment in Flink: Current State and the new Application Mode</a></dt>
-        <dd><p>With the rise of stream processing and real-time analytics as a critical tool for modern 
-businesses, an increasing number of organizations build platforms with Apache Flink at their
-core and offer it internally as a service. Many talks with related topics from companies 
-like <a href="https://www.youtube.com/watch?v=VX3S9POGAdU">Uber</a>, <a href="https://www.youtube.com/watch?v=VX3S9POGAdU">Netflix</a>
-and <a href="https://www.youtube.com/watch?v=cH9UdK0yYjc">Alibaba</a> in the latest editions of Flink Forward further 
-illustrate this trend.</p>
-
-</dd>
     
   </dl>
 
diff --git a/content/news/2020/07/30/demo-fraud-detection-3.html b/content/news/2020/07/30/demo-fraud-detection-3.html
new file mode 100644
index 0000000..8037937
--- /dev/null
+++ b/content/news/2020/07/30/demo-fraud-detection-3.html
@@ -0,0 +1,908 @@
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
+    <title>Apache Flink: Advanced Flink Application Patterns Vol.3: Custom Window Processing</title>
+    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
+    <link rel="icon" href="/favicon.ico" type="image/x-icon">
+
+    <!-- Bootstrap -->
+    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/css/bootstrap.min.css">
+    <link rel="stylesheet" href="/css/flink.css">
+    <link rel="stylesheet" href="/css/syntax.css">
+
+    <!-- Blog RSS feed -->
+    <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />
+
+    <!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
+    <!-- We need to load Jquery in the header for custom google analytics event tracking-->
+    <script src="/js/jquery.min.js"></script>
+
+    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
+    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
+    <!--[if lt IE 9]>
+      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
+      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
+    <![endif]-->
+  </head>
+  <body>  
+    
+
+    <!-- Main content. -->
+    <div class="container">
+    <div class="row">
+
+      
+     <div id="sidebar" class="col-sm-3">
+        
+
+<!-- Top navbar. -->
+    <nav class="navbar navbar-default">
+        <!-- The logo. -->
+        <div class="navbar-header">
+          <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+          </button>
+          <div class="navbar-logo">
+            <a href="/">
+              <img alt="Apache Flink" src="/img/flink-header-logo.svg" width="147px" height="73px">
+            </a>
+          </div>
+        </div><!-- /.navbar-header -->
+
+        <!-- The navigation links. -->
+        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
+          <ul class="nav navbar-nav navbar-main">
+
+            <!-- First menu section explains visitors what Flink is -->
+
+            <!-- What is Stream Processing? -->
+            <!--
+            <li><a href="/streamprocessing1.html">What is Stream Processing?</a></li>
+            -->
+
+            <!-- What is Flink? -->
+            <li><a href="/flink-architecture.html">What is Apache Flink?</a></li>
+
+            
+
+            <!-- What is Stateful Functions? -->
+
+            <li><a href="/stateful-functions.html">What is Stateful Functions?</a></li>
+
+            <!-- Use cases -->
+            <li><a href="/usecases.html">Use Cases</a></li>
+
+            <!-- Powered by -->
+            <li><a href="/poweredby.html">Powered By</a></li>
+
+
+            &nbsp;
+            <!-- Second menu section aims to support Flink users -->
+
+            <!-- Downloads -->
+            <li><a href="/downloads.html">Downloads</a></li>
+
+            <!-- Getting Started -->
+            <li class="dropdown">
+              <a class="dropdown-toggle" data-toggle="dropdown" href="#">Getting Started<span class="caret"></span></a>
+              <ul class="dropdown-menu">
+                <li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/getting-started/index.html" target="_blank">With Flink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+                <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/getting-started/project-setup.html" target="_blank">With Flink Stateful Functions <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+                <li><a href="/training.html">Training Course</a></li>
+              </ul>
+            </li>
+
+            <!-- Documentation -->
+            <li class="dropdown">
+              <a class="dropdown-toggle" data-toggle="dropdown" href="#">Documentation<span class="caret"></span></a>
+              <ul class="dropdown-menu">
+                <li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11" target="_blank">Flink 1.11 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+                <li><a href="https://ci.apache.org/projects/flink/flink-docs-master" target="_blank">Flink Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+                <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1" target="_blank">Flink Stateful Functions 2.1 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+                <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-master" target="_blank">Flink Stateful Functions Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+              </ul>
+            </li>
+
+            <!-- getting help -->
+            <li><a href="/gettinghelp.html">Getting Help</a></li>
+
+            <!-- Blog -->
+            <li class="active"><a href="/blog/"><b>Flink Blog</b></a></li>
+
+
+            <!-- Flink-packages -->
+            <li>
+              <a href="https://flink-packages.org" target="_blank">flink-packages.org <small><span class="glyphicon glyphicon-new-window"></span></small></a>
+            </li>
+            &nbsp;
+
+            <!-- Third menu section aim to support community and contributors -->
+
+            <!-- Community -->
+            <li><a href="/community.html">Community &amp; Project Info</a></li>
+
+            <!-- Roadmap -->
+            <li><a href="/roadmap.html">Roadmap</a></li>
+
+            <!-- Contribute -->
+            <li><a href="/contributing/how-to-contribute.html">How to Contribute</a></li>
+            
+
+            <!-- GitHub -->
+            <li>
+              <a href="https://github.com/apache/flink" target="_blank">Flink on GitHub <small><span class="glyphicon glyphicon-new-window"></span></small></a>
+            </li>
+
+            &nbsp;
+
+            <!-- Language Switcher -->
+            <li>
+              
+                
+                  <!-- link to the Chinese home page when current is blog page -->
+                  <a href="/zh">中文版</a>
+                
+              
+            </li>
+
+          </ul>
+
+          <ul class="nav navbar-nav navbar-bottom">
+          <hr />
+
+            <!-- Twitter -->
+            <li><a href="https://twitter.com/apacheflink" target="_blank">@ApacheFlink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+
+            <!-- Visualizer -->
+            <li class=" hidden-md hidden-sm"><a href="/visualizer/" target="_blank">Plan Visualizer <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+
+          <hr />
+
+            <li><a href="https://apache.org" target="_blank">Apache Software Foundation <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+
+            <li>
+              <style>
+                .smalllinks:link {
+                  display: inline-block !important; background: none; padding-top: 0px; padding-bottom: 0px; padding-right: 0px; min-width: 75px;
+                }
+              </style>
+
+              <a class="smalllinks" href="https://www.apache.org/licenses/" target="_blank">License</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
+
+              <a class="smalllinks" href="https://www.apache.org/security/" target="_blank">Security</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
+
+              <a class="smalllinks" href="https://www.apache.org/foundation/sponsorship.html" target="_blank">Donate</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
+
+              <a class="smalllinks" href="https://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
+            </li>
+
+          </ul>
+        </div><!-- /.navbar-collapse -->
+    </nav>
+
+      </div>
+      <div class="col-sm-9">
+      <div class="row-fluid">
+  <div class="col-sm-12">
+    <div class="row">
+      <h1>Advanced Flink Application Patterns Vol.3: Custom Window Processing</h1>
+      <p><i></i></p>
+
+      <article>
+        <p>30 Jul 2020 Alexander Fedulov (<a href="https://twitter.com/alex_fedulov">@alex_fedulov</a>)</p>
+
+<style type="text/css">
+.tg  {border-collapse:collapse;border-spacing:0;}
+.tg td{padding:10px 10px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;}
+.tg th{padding:10px 10px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;background-color:#eff0f1;}
+.tg .tg-wide{padding:10px 30px;}
+.tg .tg-top{vertical-align:top}
+.tg .tg-topcenter{text-align:center;vertical-align:top}
+.tg .tg-center{text-align:center;vertical-align:center}
+</style>
+
+<h2 id="introduction">Introduction</h2>
+
+<p>In the previous articles of the series, we described how you can achieve
+flexible stream partitioning based on dynamically-updated configurations
+(a set of fraud-detection rules) and how you can utilize Flink's
+Broadcast mechanism to distribute processing configuration at runtime
+among the relevant operators. </p>
+
+<p>Following up directly where we left the discussion of the end-to-end
+solution last time, in this article we will describe how you can use the
+"Swiss knife" of Flink - the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html"><em>Process Function</em></a> to create an
+implementation that is tailor-made to match your streaming business
+logic requirements. Our discussion will continue in the context of the
+<a href="/news/2020/01/15/demo-fraud-detection.html#fraud-detection-demo">Fraud Detection engine</a>. We will also demonstrate how you can
+implement your own <strong>custom replacement for time windows</strong> for cases
+where the out-of-the-box windowing available from the DataStream API
+does not satisfy your requirements. In particular, we will look at the
+trade-offs that you can make when designing a solution which requires
+low-latency reactions to individual events.</p>
+
+<p>This article will describe some high-level concepts that can be applied
+independently, but it is recommended that you review the material in
+<a href="/news/2020/01/15/demo-fraud-detection.html">part one</a> and
+<a href="/news/2020/03/24/demo-fraud-detection-2.html">part two</a> of the series as well as checkout the <a href="https://github.com/afedulov/fraud-detection-demo">code
+base</a> in order to make
+it easier to follow along.</p>
+
+<h2 id="processfunction-as-a-window">ProcessFunction as a “Window”</h2>
+
+<h3 id="low-latency">Low Latency</h3>
+
+<p>Let’s start with a reminder of the type of fraud detection rule that we
+would like to support:</p>
+
+<p><em>“Whenever the <strong>sum</strong> of  <strong>payments</strong> from the same <strong>payer</strong> to the
+same <strong>beneficiary</strong> within <strong>a 24 hour
+period</strong> is <strong>greater</strong> than <strong>200 000 $</strong> - trigger an alert.”</em></p>
+
+<p>In other words, given a stream of transactions partitioned by a key that
+combines the payer and the beneficiary fields, we would like to look
+back in time and determine, for each incoming transaction, if the sum of
+all previous payments between the two specific participants exceeds the
+defined threshold. In effect, the computation window is always moved
+along to the position of the last observed event for a particular data
+partitioning key.</p>
+
+<center>
+<img src="/img/blog/patterns-blog-3/time-windows.png" width="600px" alt="Figure 1: Time Windows" />
+<br />
+<i><small>Figure 1: Time Windows</small></i>
+</center>
+<p><br /></p>
+
+<p>One of the common key requirements for a fraud detection system is <em>low
+response time</em>. The sooner the fraudulent action gets detected, the
+higher the chances that it can be blocked and its negative consequences
+mitigated. This requirement is especially prominent in the financial
+domain, where you have one important constraint - any time spent
+evaluating a fraud detection model is time that a law-abiding user of
+your system will spend waiting for a response. Swiftness of processing
+often becomes a competitive advantage between various payment systems
+and the time limit for producing an alert could lie as low as <em>300-500
+ms</em>. This is all the time you get from the moment of ingestion of a
+transaction event into a fraud detection system until an alert has to
+become available to downstream systems. </p>
+
+<p>As you might know, Flink provides a powerful <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html">Window
+API</a>
+that is applicable for a wide range of use cases. However, if you go
+over all of the available types of supported windows, you will realize
+that none of them exactly match our main requirement for this use case -
+the low-latency evaluation of <em>each</em> incoming transaction. There is
+no type of window in Flink that can express the <em>“x minutes/hours/days
+back from the <u>current event</u>”</em> semantic. In the Window API, events
+fall into windows (as defined by the window
+<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#window-assigners">assigners</a>),
+but they cannot themselves individually control the creation and
+evaluation of windows*. As described above, our goal for the fraud
+detection engine is to achieve immediate evaluation of the previous
+relevant data points as soon as the new event is received. This raises
+the question of feasibility of applying the Window API in this case. The Window API offers some options for defining custom triggers, evictors, and window assigners, which may get to the required result. However, it is usually difficult to get this right (and easy to break). Moreover, this approach does not provide access to broadcast state, which is required for implementing dynamic reconfiguration of business rules.</p>
+
+<p>*) apart from the session windows, but they are limited to assignments
+based on the session <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#session-windows">gaps</a></p>
+
+<center>
+<img src="/img/blog/patterns-blog-3/evaluation-delays.png" width="600px" alt="Figure 2: Evaluation Delays" />
+<br />
+<i><small>Figure 2: Evaluation Delays</small></i>
+</center>
+<p><br /></p>
+
+<p>Let’s take an example of using a <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#sliding-windows">sliding
+window</a>
+from Flink’s Window API. Using sliding windows with the slide of <em>S</em>
+translates into an expected value of evaluation delay equal to <em>S/2.</em>
+This means that you would need to define a window slide of 600-1000 ms
+to fulfill the low-latency requirement of 300-500 ms delay, even before
+taking any actual computation time into account. The fact that Flink
+stores a separate window state for each sliding window pane renders this
+approach unfeasible under any moderately high load conditions.</p>
+
+<p>In order to satisfy the requirements, we need to create our own
+low-latency window implementation. Luckily, Flink gives us all the tools
+required to do so. <code>ProcessFunction</code> is a low-level, but powerful
+building block in Flink's API. It has a simple contract:</p>
+
+<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">SomeProcessFunction</span> <span class="kd">extends</span> <span class="n">KeyedProcessFunction</span><span class="o">&lt;</span><span class="n">KeyType</span><span class="o">,</span> <span class="n">InputType</span><span class="o">,</span> <span class="n">OutputType</span><span class="o">&gt;</span> <span class="o">{</span>
+
+	<span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">InputType</span> <span class="n">event</span><span class="o">,</span> <span class="n">Context</span> <span class="n">ctx</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">OutputType</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">){}</span>
+
+	<span class="kd">public</span> <span class="kt">void</span> <span class="nf">onTimer</span><span class="o">(</span><span class="kt">long</span> <span class="n">timestamp</span><span class="o">,</span> <span class="n">OnTimerContext</span> <span class="n">ctx</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">OutputType</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{}</span>
+
+	<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">parameters</span><span class="o">){}</span>
+<span class="o">}</span></code></pre></div>
+
+<ul>
+  <li>
+    <p><code>processElement()</code> receives input events one by one. You can react to
+each input by producing one or more output events to the next
+operator by calling <code>out.collect(someOutput)</code>. You can also pass data
+to a <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html">side
+output</a>
+or ignore a particular input altogether.</p>
+  </li>
+  <li>
+    <p><code>onTimer()</code> is called by Flink when a previously-registered timer
+fires. Both event time and processing time timers are supported.</p>
+  </li>
+  <li>
+    <p><code>open()</code> is equivalent to a constructor. It is called inside of the
+<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/glossary.html#flink-taskmanager">TaskManager’s</a>
+JVM, and is used for initialization, such as registering
+Flink-managed state. It is also the right place to initialize fields
+that are not serializable and cannot be transferred from the
+JobManager’s JVM.</p>
+  </li>
+</ul>
+
+<p>Most importantly, <code>ProcessFunction</code> also has access to the fault-tolerant
+state, handled by Flink. This combination, together with Flink's
+message processing and delivery guarantees, makes it possible to build
+resilient event-driven applications with almost arbitrarily
+sophisticated business logic. This includes creation and processing of
+custom windows with state.</p>
+
+<h3 id="implementation">Implementation</h3>
+
+<h4 id="state-and-clean-up">State and Clean-up</h4>
+
+<p>In order to be able to process time windows, we need to keep track of
+data belonging to the window inside of our program. To ensure that this
+data is fault-tolerant and can survive failures in a distributed system,
+we should store it inside of Flink-managed state. As the time
+progresses, we do not need to keep all previous transactions. According
+to the sample rule, all events that are older than 24 hours become
+irrelevant. We are looking at a window of data that constantly moves and
+where stale transactions need to be constantly moved out of scope (in
+other words, cleaned up from state).</p>
+
+<center>
+<img src="/img/blog/patterns-blog-3/window-clean-up.png" width="400px" alt="Figure 3: Window Clean-up" />
+<br />
+<i><small>Figure 3: Window Clean-up</small></i>
+</center>
+<p><br /></p>
+
+<p>We will
+<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state">use</a>
+<code>MapState</code> to store the individual events of the window. In order to allow
+efficient clean-up of the out-of-scope events, we will utilize event
+timestamps as the <code>MapState</code> keys.</p>
+
+<p>In a general case, we have to take into account the fact that there
+might be different events with exactly the same timestamp, therefore
+instead of individual Transaction per key(timestamp) we will store sets.</p>
+
+<div class="highlight"><pre><code class="language-java"><span class="n">MapState</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Set</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">&gt;&gt;</span> <span class="n">windowState</span><span class="o">;</span></code></pre></div>
+
+<div class="alert alert-info">
+  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Side Note </span>
+when any Flink-managed state is used inside a
+<code>KeyedProcessFunction</code>, the data returned by the <code>state.value()</code> call is
+automatically scoped by the key of the <em>currently-processed event</em>
+- see Figure 4. If <code>MapState</code> is used, the same principle applies, with
+the difference that a <code>Map</code> is returned instead of <code>MyObject</code>. If you are
+compelled to do something like
+<code>mapState.value().get(inputEvent.getKey())</code>, you should probably be using
+<code>ValueState</code> instead of the <code>MapState</code>. As we want to store <em>multiple values
+per event key</em>, in our case, <code>MapState</code> is the right choice.</p>
+
+  <p><br /></p>
+
+  <center>
+<img src="/img/blog/patterns-blog-3/keyed-state-scoping.png" width="800px" alt="Figure 4: Keyed State Scoping" />
+<br />
+<i><small>Figure 4: Keyed State Scoping</small></i>
+</center>
+
+</div>
+
+<p>As described in the <a href="/news/2020/01/15/demo-fraud-detection.html">first blog of the series</a>, we are dispatching events based on the keys
+specified in the active fraud detection rules. Multiple distinct rules
+can be based on the same grouping key. This means that our alerting
+function can potentially receive transactions scoped by the same key
+(e.g. <code>{payerId=25;beneficiaryId=12}</code>), but destined to be evaluated
+according to different rules, which implies potentially different
+lengths of the time windows. This raises the question of how can we best
+store fault-tolerant window state within the <code>KeyedProcessFunction</code>. One
+approach would be to create and manage separate <code>MapStates</code> per rule. Such
+an approach, however, would be wasteful - we would separately hold state
+for overlapping time windows, and therefore unnecessarily store
+duplicate events. A better approach is to always store just enough data
+to be able to estimate all currently active rules which are scoped by
+the same key. In order to achieve that, whenever a new rule is added, we
+will determine if its time window has the largest span and store it in
+the broadcast state under the special reserved <code>WIDEST_RULE_KEY</code>. This
+information will later be used during the state clean-up procedure, as
+described later in this section.</p>
+
+<div class="highlight"><pre><code class="language-java"><span class="nd">@Override</span>
+<span class="kd">public</span> <span class="kt">void</span> <span class="nf">processBroadcastElement</span><span class="o">(</span><span class="n">Rule</span> <span class="n">rule</span><span class="o">,</span> <span class="n">Context</span> <span class="n">ctx</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Alert</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">){</span>
+  <span class="o">...</span>
+  <span class="n">updateWidestWindowRule</span><span class="o">(</span><span class="n">rule</span><span class="o">,</span> <span class="n">broadcastState</span><span class="o">);</span>
+<span class="o">}</span>
+
+<span class="kd">private</span> <span class="kt">void</span> <span class="nf">updateWidestWindowRule</span><span class="o">(</span><span class="n">Rule</span> <span class="n">rule</span><span class="o">,</span> <span class="n">BroadcastState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Rule</span><span class="o">&gt;</span> <span class="n">broadcastState</span><span class="o">){</span>
+  <span class="n">Rule</span> <span class="n">widestWindowRule</span> <span class="o">=</span> <span class="n">broadcastState</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">WIDEST_RULE_KEY</span><span class="o">);</span>
+
+  <span class="k">if</span> <span class="o">(</span><span class="n">widestWindowRule</span> <span class="o">==</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">broadcastState</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">WIDEST_RULE_KEY</span><span class="o">,</span> <span class="n">rule</span><span class="o">);</span>
+    <span class="k">return</span><span class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="k">if</span> <span class="o">(</span><span class="n">widestWindowRule</span><span class="o">.</span><span class="na">getWindowMillis</span><span class="o">()</span> <span class="o">&lt;</span> <span class="n">rule</span><span class="o">.</span><span class="na">getWindowMillis</span><span class="o">())</span> <span class="o">{</span>
+    <span class="n">broadcastState</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">WIDEST_RULE_KEY</span><span class="o">,</span> <span class="n">rule</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+<p>Let’s now look at the implementation of the main method,
+<code>processElement()</code>, in some detail.</p>
+
+<p>In the <a href="/news/2020/01/15/demo-fraud-detection.html#dynamic-data-partitioning">previous blog post</a>, we described how <code>DynamicKeyFunction</code> allowed
+us to perform dynamic data partitioning based on the <code>groupingKeyNames</code>
+parameter in the rule definition. The subsequent description is focused
+around the <code>DynamicAlertFunction</code>, which makes use of the remaining rule
+settings.</p>
+
+<center>
+<img src="/img/blog/patterns-blog-3/sample-rule-definition.png" width="700px" alt="Figure 5: Sample Rule Definition" />
+<br />
+<i><small>Figure 5: Sample Rule Definition</small></i>
+</center>
+<p><br /></p>
+
+<p>As described in the previous parts of the blog post
+series, our alerting process function receives events of type
+<code>Keyed&lt;Transaction, String, Integer&gt;</code>, where <code>Transaction</code> is the main
+“wrapped” event, String is the key (<em>payer #x - beneficiary #y</em> in
+Figure 1), and <code>Integer</code> is the ID of the rule that caused the dispatch of
+this event. This rule was previously <a href="/news/2020/03/24/demo-fraud-detection-2.html#broadcast-state-pattern">stored in the broadcast state</a> and has to be retrieved from that state by the ID. Here is the
+outline of the implementation:</p>
+
+<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">DynamicAlertFunction</span>
+    <span class="kd">extends</span> <span class="n">KeyedBroadcastProcessFunction</span><span class="o">&lt;</span>
+        <span class="n">String</span><span class="o">,</span> <span class="n">Keyed</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Rule</span><span class="o">,</span> <span class="n">Alert</span><span class="o">&gt;</span> <span class="o">{</span>
+
+  <span class="kd">private</span> <span class="kd">transient</span> <span class="n">MapState</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Set</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">&gt;&gt;</span> <span class="n">windowState</span><span class="o">;</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span>
+      <span class="n">Keyed</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">value</span><span class="o">,</span> <span class="n">ReadOnlyContext</span> <span class="n">ctx</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Alert</span><span class="o">&gt;</span> <s [...]
+
+    <span class="c1">// Add Transaction to state</span>
+    <span class="kt">long</span> <span class="n">currentEventTime</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">getWrapped</span><span class="o">().</span><span class="na">getEventTime</span><span class="o">();</span>                            <span class="c1">// &lt;--- (1)</span>
+    <span class="n">addToStateValuesSet</span><span class="o">(</span><span class="n">windowState</span><span class="o">,</span> <span class="n">currentEventTime</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">getWrapped</span><span class="o">());</span>
+
+    <span class="c1">// Calculate the aggregate value</span>
+    <span class="n">Rule</span> <span class="n">rule</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">getBroadcastState</span><span class="o">(</span><span class="n">Descriptors</span><span class="o">.</span><span class="na">rulesDescriptor</span><span class="o">).</span><span class="na">get</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">getId</span><span class="o">());</span>    <span c [...]
+    <span class="n">Long</span> <span class="n">windowStartTimestampForEvent</span> <span class="o">=</span> <span class="n">rule</span><span class="o">.</span><span class="na">getWindowStartTimestampFor</span><span class="o">(</span><span class="n">currentEventTime</span><span class="o">);</span><span class="c1">// &lt;--- (3)</span>
+
+    <span class="n">SimpleAccumulator</span><span class="o">&lt;</span><span class="n">BigDecimal</span><span class="o">&gt;</span> <span class="n">aggregator</span> <span class="o">=</span> <span class="n">RuleHelper</span><span class="o">.</span><span class="na">getAggregator</span><span class="o">(</span><span class="n">rule</span><span class="o">);</span>            <span class="c1">// &lt;--- (4)</span>
+    <span class="k">for</span> <span class="o">(</span><span class="n">Long</span> <span class="n">stateEventTime</span> <span class="o">:</span> <span class="n">windowState</span><span class="o">.</span><span class="na">keys</span><span class="o">())</span> <span class="o">{</span>
+      <span class="k">if</span> <span class="o">(</span><span class="n">isStateValueInWindow</span><span class="o">(</span><span class="n">stateEventTime</span><span class="o">,</span> <span class="n">windowStartForEvent</span><span class="o">,</span> <span class="n">currentEventTime</span><span class="o">))</span> <span class="o">{</span>
+        <span class="n">aggregateValuesInState</span><span class="o">(</span><span class="n">stateEventTime</span><span class="o">,</span> <span class="n">aggregator</span><span class="o">,</span> <span class="n">rule</span><span class="o">);</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+
+    <span class="c1">// Evaluate the rule and trigger an alert if violated</span>
+    <span class="n">BigDecimal</span> <span class="n">aggregateResult</span> <span class="o">=</span> <span class="n">aggregator</span><span class="o">.</span><span class="na">getLocalValue</span><span class="o">();</span>                              <span class="c1">// &lt;--- (5)</span>
+    <span class="kt">boolean</span> <span class="n">isRuleViolated</span> <span class="o">=</span> <span class="n">rule</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">aggregateResult</span><span class="o">);</span>
+    <span class="k">if</span> <span class="o">(</span><span class="n">isRuleViolated</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kt">long</span> <span class="n">decisionTime</span> <span class="o">=</span> <span class="n">System</span><span class="o">.</span><span class="na">currentTimeMillis</span><span class="o">();</span>
+      <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Alert</span><span class="o">&lt;&gt;(</span><span class="n">rule</span><span class="o">.</span><span class="na">getRuleId</span><span class="o">(),</span>
+                              <span class="n">rule</span><span class="o">,</span>
+                              <span class="n">value</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span>
+                              <span class="n">decisionTime</span><span class="o">,</span>
+                              <span class="n">value</span><span class="o">.</span><span class="na">getWrapped</span><span class="o">(),</span>
+                              <span class="n">aggregateResult</span><span class="o">));</span>
+    <span class="o">}</span>
+
+    <span class="c1">// Register timers to ensure state cleanup</span>
+    <span class="kt">long</span> <span class="n">cleanupTime</span> <span class="o">=</span> <span class="o">(</span><span class="n">currentEventTime</span> <span class="o">/</span> <span class="mi">1000</span><span class="o">)</span> <span class="o">*</span> <span class="mi">1000</span><span class="o">;</span>                                  <span class="c1">// &lt;--- (6)</span>
+    <span class="n">ctx</span><span class="o">.</span><span class="na">timerService</span><span class="o">().</span><span class="na">registerEventTimeTimer</span><span class="o">(</span><span class="n">cleanupTime</span><span class="o">);</span>
+  <span class="o">}</span></code></pre></div>
+
+<p><br />
+Here are the details of the steps:<br />
+1)  We first add each new event to our window state:</p>
+
+<div class="highlight"><pre><code class="language-java"><span class="kd">static</span> <span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> <span class="n">Set</span><span class="o">&lt;</span><span class="n">V</span><span class="o">&gt;</span> <span class="nf">addToStateValuesSet</span><span class="o">(</span><span class="n">MapState</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span [...]
+      <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+    <span class="n">Set</span><span class="o">&lt;</span><span class="n">V</span><span class="o">&gt;</span> <span class="n">valuesSet</span> <span class="o">=</span> <span class="n">mapState</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">key</span><span class="o">);</span>
+    <span class="k">if</span> <span class="o">(</span><span class="n">valuesSet</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">valuesSet</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
+    <span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
+      <span class="n">valuesSet</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashSet</span><span class="o">&lt;&gt;();</span>
+      <span class="n">valuesSet</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
+    <span class="o">}</span>
+    <span class="n">mapState</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">valuesSet</span><span class="o">);</span>
+    <span class="k">return</span> <span class="n">valuesSet</span><span class="o">;</span>
+<span class="o">}</span></code></pre></div>
+
+<p>2) Next, we retrieve the previously-broadcasted rule, according to
+    which the incoming transaction needs to be evaluated.</p>
+
+<p>3) <code>getWindowStartTimestampFor</code> determines, given the window span defined
+    in the rule, and the current transaction timestamp, how far back in
+    time our evaluation should span.</p>
+
+<p>4) The aggregate value is calculated by iterating over all window state
+    entries and applying an aggregate function. It could be an <em>average,
+    max, min</em> or, as in the example rule from the beginning of this
+    section, a <em>sum</em>.</p>
+
+<div class="highlight"><pre><code class="language-java"><span class="kd">private</span> <span class="kt">boolean</span> <span class="nf">isStateValueInWindow</span><span class="o">(</span>
+    <span class="n">Long</span> <span class="n">stateEventTime</span><span class="o">,</span> <span class="n">Long</span> <span class="n">windowStartForEvent</span><span class="o">,</span> <span class="kt">long</span> <span class="n">currentEventTime</span><span class="o">)</span> <span class="o">{</span>
+  <span class="k">return</span> <span class="n">stateEventTime</span> <span class="o">&gt;=</span> <span class="n">windowStartForEvent</span> <span class="o">&amp;&amp;</span> <span class="n">stateEventTime</span> <span class="o">&lt;=</span> <span class="n">currentEventTime</span><span class="o">;</span>
+<span class="o">}</span>
+
+<span class="kd">private</span> <span class="kt">void</span> <span class="nf">aggregateValuesInState</span><span class="o">(</span>
+    <span class="n">Long</span> <span class="n">stateEventTime</span><span class="o">,</span> <span class="n">SimpleAccumulator</span><span class="o">&lt;</span><span class="n">BigDecimal</span><span class="o">&gt;</span> <span class="n">aggregator</span><span class="o">,</span> <span class="n">Rule</span> <span class="n">rule</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+  <span class="n">Set</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">&gt;</span> <span class="n">inWindow</span> <span class="o">=</span> <span class="n">windowState</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">stateEventTime</span><span class="o">);</span>
+  <span class="k">for</span> <span class="o">(</span><span class="n">Transaction</span> <span class="n">event</span> <span class="o">:</span> <span class="n">inWindow</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">BigDecimal</span> <span class="n">aggregatedValue</span> <span class="o">=</span>
+        <span class="n">FieldsExtractor</span><span class="o">.</span><span class="na">getBigDecimalByName</span><span class="o">(</span><span class="n">rule</span><span class="o">.</span><span class="na">getAggregateFieldName</span><span class="o">(),</span> <span class="n">event</span><span class="o">);</span>
+    <span class="n">aggregator</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">aggregatedValue</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+<p>5) Having an aggregate value, we can compare it to the threshold value
+    that is specified in the rule definition and fire an alert, if
+    necessary.</p>
+
+<p>6) At the end, we register a clean-up timer using
+<code>ctx.timerService().registerEventTimeTimer()</code>. This timer will be
+    responsible for removing the current transaction when it is going to
+    move out of scope.</p>
+
+<div class="alert alert-info">
+  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span>  Note </span>
+Notice the rounding during timer creation. It is an important technique
+which enables a reasonable trade-off between the precision with which
+the timers will be triggered, and the number of timers being used.
+Timers are stored in Flink’s fault-tolerant state, and managing them
+with millisecond-level precision can be wasteful. In our case, with this
+rounding, we will create at most one timer per key in any given second. Flink documentation provides some additional <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#timer-coalescing"><u>details</u></a>.</p>
+</div>
+
+<p>7) The <code>onTimer</code> method will trigger the clean-up of the window state.</p>
+
+<p>As previously described, we are always keeping as many events in the
+state as required for the evaluation of an active rule with the widest
+window span. This means that during the clean-up, we only need to remove
+the state which is out of scope of this widest window.</p>
+
+<center>
+<img src="/img/blog/patterns-blog-3/widest-window.png" width="800px" alt="Figure 6: Widest Window" />
+<br />
+<i><small>Figure 6: Widest Window</small></i>
+</center>
+<p><br /></p>
+
+<p>This is how the clean-up procedure can be implemented:</p>
+
+<div class="highlight"><pre><code class="language-java"><span class="nd">@Override</span>
+<span class="kd">public</span> <span class="kt">void</span> <span class="nf">onTimer</span><span class="o">(</span><span class="kd">final</span> <span class="kt">long</span> <span class="n">timestamp</span><span class="o">,</span> <span class="kd">final</span> <span class="n">OnTimerContext</span> <span class="n">ctx</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Alert</span><span class="o">&gt;</spa [...]
+    <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+
+  <span class="n">Rule</span> <span class="n">widestWindowRule</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">getBroadcastState</span><span class="o">(</span><span class="n">Descriptors</span><span class="o">.</span><span class="na">rulesDescriptor</span><span class="o">).</span><span class="na">get</span><span class="o">(</span><span class="n">WIDEST_RULE_KEY</span><span class="o">);</span>
+
+  <span class="n">Optional</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">cleanupEventTimeWindow</span> <span class="o">=</span>
+      <span class="n">Optional</span><span class="o">.</span><span class="na">ofNullable</span><span class="o">(</span><span class="n">widestWindowRule</span><span class="o">).</span><span class="na">map</span><span class="o">(</span><span class="nl">Rule:</span><span class="o">:</span><span class="n">getWindowMillis</span><span class="o">);</span>
+  <span class="n">Optional</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">cleanupEventTimeThreshold</span> <span class="o">=</span>
+      <span class="n">cleanupEventTimeWindow</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">window</span> <span class="o">-&gt;</span> <span class="n">timestamp</span> <span class="o">-</span> <span class="n">window</span><span class="o">);</span>
+  <span class="c1">// Remove events that are older than (timestamp - widestWindowSpan)ms</span>
+  <span class="n">cleanupEventTimeThreshold</span><span class="o">.</span><span class="na">ifPresent</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">evictOutOfScopeElementsFromWindow</span><span class="o">);</span>
+<span class="o">}</span>
+
+<span class="kd">private</span> <span class="kt">void</span> <span class="nf">evictOutOfScopeElementsFromWindow</span><span class="o">(</span><span class="n">Long</span> <span class="n">threshold</span><span class="o">)</span> <span class="o">{</span>
+  <span class="k">try</span> <span class="o">{</span>
+    <span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">keys</span> <span class="o">=</span> <span class="n">windowState</span><span class="o">.</span><span class="na">keys</span><span class="o">().</span><span class="na">iterator</span><span class="o">();</span>
+    <span class="k">while</span> <span class="o">(</span><span class="n">keys</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+      <span class="n">Long</span> <span class="n">stateEventTime</span> <span class="o">=</span> <span class="n">keys</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
+      <span class="k">if</span> <span class="o">(</span><span class="n">stateEventTime</span> <span class="o">&lt;</span> <span class="n">threshold</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">keys</span><span class="o">.</span><span class="na">remove</span><span class="o">();</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+  <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Exception</span> <span class="n">ex</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">throw</span> <span class="k">new</span> <span class="nf">RuntimeException</span><span class="o">(</span><span class="n">ex</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+<div class="alert alert-info">
+  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span>  Note</span>
+You might be wondering why we did not use <code>ListState</code> , as we are always
+iterating over all of the values of the window state? This is actually
+an optimization for the case when <code>RocksDBStateBackend</code>
+<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#the-rocksdbstatebackend">is used</a>. Iterating over a <code>ListState</code> would cause all of the <code>Transaction</code>
+objects to be deserialized. Using <code>MapState</code>'s keys iterator only causes
+deserialization of the keys (type <code>long</code>), and therefore reduces the
+computational overhead.</p>
+</div>
+
+<p>This concludes the description of the implementation details. Our
+approach triggers evaluation of a time window as soon as a new
+transaction arrives. It therefore fulfills the main requirement that we
+have targeted - low delay for potentially issuing an alert. For the
+complete implementation, please have a look at
+<a href="https://github.com/afedulov/fraud-detection-demo">the project on github</a>.</p>
+
+<h2 id="improvements-and-optimizations">Improvements and Optimizations</h2>
+
+<p>What are the pros and cons of the described approach?</p>
+
+<p><strong>Pros:</strong></p>
+
+<ul>
+  <li>
+    <p>Low latency capabilities</p>
+  </li>
+  <li>
+    <p>Tailored solution with potential use-case specific optimizations</p>
+  </li>
+  <li>
+    <p>Efficient state reuse (shared state for the rules with the same key)</p>
+  </li>
+</ul>
+
+<p><strong>Cons:</strong></p>
+
+<ul>
+  <li>
+    <p>Cannot make use of potential future optimizations in the existing
+Window API</p>
+  </li>
+  <li>
+    <p>No late event handling, which is available out of the box in the
+Window API</p>
+  </li>
+  <li>
+    <p>Quadratic computation complexity and potentially large state</p>
+  </li>
+</ul>
+
+<p>Let’s now look at the latter two drawbacks and see if we can address
+them.</p>
+
+<h4 id="late-events">Late events:</h4>
+
+<p>Processing late events poses a certain question - is it still meaningful
+to re-evaluate the window in case of a late event arrival? In case this
+is required, you would need to extend the widest window used for the
+clean-up by your maximum expected out-of-orderness. This would avoid
+having potentially incomplete time window data for such late firings
+(see Figure 7).</p>
+
+<center>
+<img src="/img/blog/patterns-blog-3/late-events.png" width="500px" alt="Figure 7: Late Events Handling" />
+<br />
+<i><small>Figure 7: Late Events Handling</small></i>
+</center>
+<p><br /></p>
+
+<p>It can be argued, however, that for a use case that puts emphasis on low
+latency processing, such late triggering would be meaningless. In this
+case, we could keep track of the most recent timestamp that we have
+observed so far, and for events that do not monotonically increase this
+value, only add them to the state and skip the aggregate calculation and
+the alert triggering logic.</p>
+
+<h4 id="redundant-re-computations-and-state-size">Redundant Re-computations and State Size:</h4>
+
+<p>In our described implementation we keep individual transactions in state
+and go over them to calculate the aggregate again and again on every new
+event. This is obviously not optimal in terms of wasting computational
+resources on repeated calculations.</p>
+
+<p>What is the main reason to keep the individual transactions in state?
+The granularity of stored events directly corresponds to the precision
+of the time window calculation. Because we store transactions
+individually, we can precisely ignore individual transactions as soon as
+they leave the exact 2592000000 ms time window (30 days in ms). At this
+point, it is worth raising the question - do we really need this
+milliseconds precision when estimating such a long time window, or is it
+OK to accept potential false positives in exceptional cases? If the
+answer for your use case is that such precision is not needed, you could
+implement additional optimization based on bucketing and
+pre-aggregation. The idea of this optimization can be broken down as
+follows:</p>
+
+<ul>
+  <li>
+    <p>Instead of storing individual events, create a parent class that can
+either contain fields of a single transaction, or combined values,
+calculated based on applying an aggregate function to a set of
+transactions.</p>
+  </li>
+  <li>
+    <p>Instead of using timestamps in milliseconds as <code>MapState</code> keys, round
+them to the level of “resolution” that you are willing to accept
+(for instance, a full minute). Each entry therefore represents a
+bucket.</p>
+  </li>
+  <li>
+    <p>Whenever a window is evaluated, append the new transaction’s data to
+the bucket aggregate instead of storing individual data points per
+transaction.</p>
+  </li>
+</ul>
+
+<center>
+<img src="/img/blog/patterns-blog-3/pre-aggregation.png" width="700px" alt="Figure 8: Pre-aggregation" />
+<br />
+<i><small>Figure 8: Pre-aggregation</small></i>
+</center>
+<p><br /></p>
+
+<h4 id="state-data-and-serializers">State Data and Serializers</h4>
+
+<p>Another question that we can ask ourselves in order to further optimize
+the implementation is how probable is it to get different events with
+exactly the same timestamp. In the described implementation, we
+demonstrated one way of approaching this question by storing sets of
+transactions per timestamp in <code>MapState&lt;Long, Set&lt;Transaction&gt;&gt;</code>. Such
+a choice, however, might have a more significant effect on performance
+than might be anticipated. The reason is that Flink does not currently
+provide a native <code>Set</code> serializer and will enforce a fallback to the less
+efficient <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/types_serialization.html#general-class-types">Kryo
+serializer</a>
+instead
+(<a href="https://issues.apache.org/jira/browse/FLINK-16729">FLINK-16729</a>). A
+meaningful alternative strategy is to assume that, in a normal scenario,
+no two discrepant events can have exactly the same timestamp and to turn
+the window state into a <code>MapState&lt;Long, Transaction&gt;</code> type. You can use
+<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html">side-outputs</a>
+to collect and monitor any unexpected occurrences which contradict your
+assumption. During performance optimizations, I generally recommend you
+to <a href="https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#disabling-kryo">disable the fallback to
+Kryo</a>
+and verify where your application might be further optimized by ensuring
+that <a href="https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#performance-comparison">more efficient
+serializers</a>
+are being used.</p>
+
+<div class="alert alert-info">
+  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span>  Tip:</span>
+you can quickly determine which serializer is going to be
+used for your classes by setting a breakpoint and verifying the type of
+the returned TypeInformation.
+<br /></p>
+
+  <center>
+<table class="tg">
+  <tr>
+    <td class="tg-topcenter">
+      <img src="/img/blog/patterns-blog-3/type-pojo.png" alt="POJO" /></td>
+    <td class="tg-topcenter">
+      <i>PojoTypeInfo</i> indicates that that an efficient Flink POJO serializer will be used.</td>
+  </tr>
+  <tr>
+    <td class="tg-top">
+      <img src="/img/blog/patterns-blog-3/type-kryo.png" alt="Kryo" /></td>
+    <td class="tg-topcenter">
+      <i>GenericTypeInfo</i> indicates the fallback to a Kryo serializer.</td>
+  </tr>
+</table>
+</center>
+</div>
+
+<p><strong>Event pruning</strong>: instead of storing complete events and putting
+additional stress on the ser/de machinery, we can reduce individual
+events data to only relevant information. This would potentially require
+“unpacking” individual events as fields, and storing those fields into a
+generic <code>Map&lt;String, Object&gt;</code> data structure, based on the
+configurations of active rules.</p>
+
+<p>While this adjustment could potentially produce significant improvements
+for objects of large size, it should not be your first pick as it can
+easily turn into a premature optimization.</p>
+
+<h2 id="summary">Summary:</h2>
+
+<p>This article concludes the description of the implementation of the
+fraud detection engine that we started in <a href="/news/2020/01/15/demo-fraud-detection.html">part one</a>. In this blog
+post we demonstrated how <code>ProcessFunction</code> can be utilized to
+"impersonate" a window with a sophisticated custom logic. We have
+discussed the pros and cons of such approach and elaborated how custom
+use-case-specific optimizations can be applied - something that would
+not be directly possible with the Window API.</p>
+
+<p>The goal of this blog post was to illustrate the power and flexibility
+of Apache Flink’s APIs. At the core of it are the pillars of Flink, that
+spare you, as a developer, very significant amounts of work and
+generalize well to a wide range of use cases by providing:</p>
+
+<ul>
+  <li>
+    <p>Efficient data exchange in a distributed cluster</p>
+  </li>
+  <li>
+    <p>Horizontal scalability via data partitioning</p>
+  </li>
+  <li>
+    <p>Fault-tolerant state with quick, local access</p>
+  </li>
+  <li>
+    <p>Convenient abstraction for working with this state, which is as simple as using a
+local variable</p>
+  </li>
+  <li>
+    <p>Multi-threaded, parallel execution engine. <code>ProcessFunction</code> code runs
+in a single thread, without the need for synchronization. Flink
+handles all the parallel execution aspects and correct access to the
+shared state, without you, as a developer, having to think about it
+(concurrency is hard).</p>
+  </li>
+</ul>
+
+<p>All these aspects make it possible to build applications with Flink that
+go well beyond trivial streaming ETL use cases and enable implementation
+of arbitrarily-sophisticated, distributed event-driven applications.
+With Flink, you can rethink approaches to a wide range of use cases
+which normally would rely on using stateless parallel execution nodes
+and “pushing” the concerns of state fault tolerance to a database, an
+approach that is often destined to run into scalability issues in the
+face of ever-increasing data volumes.</p>
+
+      </article>
+    </div>
+
+    <div class="row">
+      <div id="disqus_thread"></div>
+      <script type="text/javascript">
+        /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
+        var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
+
+        /* * * DON'T EDIT BELOW THIS LINE * * */
+        (function() {
+            var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
+            dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
+             (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
+        })();
+      </script>
+    </div>
+  </div>
+</div>
+      </div>
+    </div>
+
+    <hr />
+
+    <div class="row">
+      <div class="footer text-center col-sm-12">
+        <p>Copyright © 2014-2019 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
+        <p>Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.</p>
+        <p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
+      </div>
+    </div>
+    </div><!-- /.container -->
+
+    <!-- Include all compiled plugins (below), or include individual files as needed -->
+    <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
+    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery.matchHeight/0.7.0/jquery.matchHeight-min.js"></script>
+    <script src="/js/codetabs.js"></script>
+    <script src="/js/stickysidebar.js"></script>
+
+    <!-- Google Analytics -->
+    <script>
+      (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
+      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+      })(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+      ga('create', 'UA-52545728-1', 'auto');
+      ga('send', 'pageview');
+    </script>
+  </body>
+</html>
diff --git a/content/zh/index.html b/content/zh/index.html
index 567c0e8..cc1f2e1 100644
--- a/content/zh/index.html
+++ b/content/zh/index.html
@@ -565,6 +565,9 @@
 
   <dl>
       
+        <dt> <a href="/news/2020/07/30/demo-fraud-detection-3.html">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a></dt>
+        <dd>In this series of blog posts you will learn about powerful Flink patterns for building streaming applications.</dd>
+      
         <dt> <a href="/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html">Flink SQL Demo: Building an End-to-End Streaming Application</a></dt>
         <dd>Apache Flink 1.11 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.</dd>
       
@@ -580,16 +583,6 @@
         <dd><p>The Apache Flink community released the first bugfix version of the Apache Flink 1.11 series.</p>
 
 </dd>
-      
-        <dt> <a href="/news/2020/07/14/application-mode.html">Application Deployment in Flink: Current State and the new Application Mode</a></dt>
-        <dd><p>With the rise of stream processing and real-time analytics as a critical tool for modern 
-businesses, an increasing number of organizations build platforms with Apache Flink at their
-core and offer it internally as a service. Many talks with related topics from companies 
-like <a href="https://www.youtube.com/watch?v=VX3S9POGAdU">Uber</a>, <a href="https://www.youtube.com/watch?v=VX3S9POGAdU">Netflix</a>
-and <a href="https://www.youtube.com/watch?v=cH9UdK0yYjc">Alibaba</a> in the latest editions of Flink Forward further 
-illustrate this trend.</p>
-
-</dd>
     
   </dl>