You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/05/26 16:40:21 UTC

[flink] branch master updated: [FLINK-22777][docs] Restored full Datastream API fraud detection example in Try Flink section

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e6f152  [FLINK-22777][docs] Restored full Datastream API fraud detection example in Try Flink section
8e6f152 is described below

commit 8e6f152d56edbcd4b88e0158a43311e7f820868c
Author: Stefan Gloutnikov <st...@gloutnikov.com>
AuthorDate: Tue May 25 19:47:55 2021 -0700

    [FLINK-22777][docs] Restored full Datastream API fraud detection example in Try Flink section
    
    This closes #16003
---
 docs/content/docs/try-flink/datastream.md | 476 ++++++++++++++++++++++++++++++
 1 file changed, 476 insertions(+)

diff --git a/docs/content/docs/try-flink/datastream.md b/docs/content/docs/try-flink/datastream.md
index 50e8959..a1059f8 100644
--- a/docs/content/docs/try-flink/datastream.md
+++ b/docs/content/docs/try-flink/datastream.md
@@ -414,6 +414,476 @@ class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
 For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
 Imagine your fraud detector processes the following stream of transactions for a particular account.
 
+{{< img src="/fig/fraud-transactions.svg" alt="Fraud Transaction" >}}
+
+Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510. Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern.
+
+To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small.
+Remembering information across events requires [state]({{< ref "docs/concepts/glossary#managed-state" >}}), and that is why we decided to use a [KeyedProcessFunction]({{< ref "docs/dev/datastream/operators/process_function" >}}). 
+It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
+
+The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed.
+When a large transaction comes through, you can simply check if the flag is set for that account.
+
+However, merely implementing the flag as a member variable in the `FraudDetector` class will not work. 
+Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true, and then a transaction for account B could set off a false alert. 
+We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
+Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
+
+To address these challenges, Flink provides primitives for a fault-tolerant state that are almost as easy to use as regular member variables.
+
+The most basic type of state in Flink is [ValueState]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}), a data type that adds fault tolerance to any variable it wraps.
+`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`.
+A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed.
+In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account. 
+`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data.
+The right hook for this is the `open()` method.
+
+{{< tabs "detector-v1" >}}
+{{< tab "Java" >}}
+```java
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient ValueState<Boolean> flagState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+    }
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+  }
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library.
+It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents.
+If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`.
+Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`.
+Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
+
+Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
+
+{{< tabs "processelement-v1" >}}
+{{< tab "Java" >}}
+```java
+@Override
+public void processElement(
+        Transaction transaction,
+        Context context,
+        Collector<Alert> collector) throws Exception {
+
+    // Get the current state for the current key
+    Boolean lastTransactionWasSmall = flagState.value();
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+        if (transaction.getAmount() > LARGE_AMOUNT) {
+            // Output an alert downstream
+            Alert alert = new Alert();
+            alert.setId(transaction.getAccountId());
+
+            collector.collect(alert);            
+        }
+
+        // Clean up our state
+        flagState.clear();
+    }
+
+    if (transaction.getAmount() < SMALL_AMOUNT) {
+        // Set the flag to true
+        flagState.update(true);
+    }
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+override def processElement(
+    transaction: Transaction,
+    context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+    collector: Collector[Alert]): Unit = {
+
+  // Get the current state for the current key
+  val lastTransactionWasSmall = flagState.value
+
+  // Check if the flag is set
+  if (lastTransactionWasSmall != null) {
+    if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+      // Output an alert downstream
+      val alert = new Alert
+      alert.setId(transaction.getAccountId)
+
+      collector.collect(alert)
+    }
+    // Clean up our state
+    flagState.clear()
+  }
+
+  if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+    // set the flag to true
+    flagState.update(true)
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+For every transaction, the fraud detector checks the state of the flag for that account.
+Remember, `ValueState` is always scoped to the current key, i.e., account.
+If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
+
+After that check, the flag state is unconditionally cleared.
+Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
+
+Finally, the transaction amount is checked to see if it is small.
+If so, then the flag is set so that it can be checked by the next event.
+Notice that `ValueState<Boolean>` has three states, unset (`null`), `true`, and `false`, because all `ValueState`'s are nullable.
+This job only makes use of unset (`null`) and `true` to check whether the flag is set or not.
+
+## Fraud Detector v2: State + Time = &#10084;&#65039;
+
+Scammers don't wait long to make their large purchases to reduce the chances their test transaction is noticed. 
+For example, suppose you wanted to set a 1-minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
+Flink's `KeyedProcessFunction` allows you to set timers that invoke a callback method at some point in time in the future.
+
+Let's see how we can modify our Job to comply with our new requirements:
+
+* Whenever the flag is set to `true`, also set a timer for 1 minute in the future.
+* When the timer fires, reset the flag by clearing its state.
+* If the flag is ever cleared the timer should be canceled.
+
+To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
+
+{{< tabs "state-v2" >}}
+{{< tab "Java" >}}
+```java
+private transient ValueState<Boolean> flagState;
+private transient ValueState<Long> timerState;
+
+@Override
+public void open(Configuration parameters) {
+    ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+            "flag",
+            Types.BOOLEAN);
+    flagState = getRuntimeContext().getState(flagDescriptor);
+
+    ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+            "timer-state",
+            Types.LONG);
+    timerState = getRuntimeContext().getState(timerDescriptor);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service.
+The timer service can be used to query the current time, register timers, and delete timers.
+With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`.
+
+{{< tabs "timer-v2" >}}
+{{< tab "Java" >}}
+```java
+if (transaction.getAmount() < SMALL_AMOUNT) {
+    // set the flag to true
+    flagState.update(true);
+
+    // set the timer and timer state
+    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+    context.timerService().registerProcessingTimeTimer(timer);
+    timerState.update(timer);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+  // set the flag to true
+  flagState.update(true)
+
+  // set the timer and timer state
+  val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+  context.timerService.registerProcessingTimeTimer(timer)
+  timerState.update(timer)
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Processing time is wall clock time, and is determined by the system clock of the machine running the operator. 
+
+When a timer fires, it calls `KeyedProcessFunction#onTimer`. 
+Overriding this method is how you can implement your callback to reset the flag.
+
+{{< tabs "timerclear-v2" >}}
+{{< tab "Java" >}}
+```java
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+    // remove flag after 1 minute
+    timerState.clear();
+    flagState.clear();
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+override def onTimer(
+    timestamp: Long,
+    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+    out: Collector[Alert]): Unit = {
+  // remove flag after 1 minute
+  timerState.clear()
+  flagState.clear()
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
+You can wrap this in a helper method and call this method instead of `flagState.clear()`.
+
+{{< tabs "cleanup-v2" >}}
+{{< tab "Java" >}}
+```java
+private void cleanUp(Context ctx) throws Exception {
+    // delete timer
+    Long timer = timerState.value();
+    ctx.timerService().deleteProcessingTimeTimer(timer);
+
+    // clean up all state
+    timerState.clear();
+    flagState.clear();
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+@throws[Exception]
+private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+  // delete timer
+  val timer = timerState.value
+  ctx.timerService.deleteProcessingTimeTimer(timer)
+
+  // clean up all states
+  timerState.clear()
+  flagState.clear()
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+And that's it, a fully functional, stateful, distributed streaming application!
+
+## Final Application
+
+{{< tabs "finalapplication" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.walkthrough.common.entity.Alert;
+import org.apache.flink.walkthrough.common.entity.Transaction;
+
+public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double SMALL_AMOUNT = 1.00;
+    private static final double LARGE_AMOUNT = 500.00;
+    private static final long ONE_MINUTE = 60 * 1000;
+
+    private transient ValueState<Boolean> flagState;
+    private transient ValueState<Long> timerState;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
+                "flag",
+                Types.BOOLEAN);
+        flagState = getRuntimeContext().getState(flagDescriptor);
+
+        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
+                "timer-state",
+                Types.LONG);
+        timerState = getRuntimeContext().getState(timerDescriptor);
+    }
+
+    @Override
+    public void processElement(
+            Transaction transaction,
+            Context context,
+            Collector<Alert> collector) throws Exception {
+
+        // Get the current state for the current key
+        Boolean lastTransactionWasSmall = flagState.value();
+
+        // Check if the flag is set
+        if (lastTransactionWasSmall != null) {
+            if (transaction.getAmount() > LARGE_AMOUNT) {
+                //Output an alert downstream
+                Alert alert = new Alert();
+                alert.setId(transaction.getAccountId());
+
+                collector.collect(alert);
+            }
+            // Clean up our state
+            cleanUp(context);
+        }
+
+        if (transaction.getAmount() < SMALL_AMOUNT) {
+            // set the flag to true
+            flagState.update(true);
+
+            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
+            context.timerService().registerProcessingTimeTimer(timer);
+
+            timerState.update(timer);
+        }
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
+        // remove flag after 1 minute
+        timerState.clear();
+        flagState.clear();
+    }
+
+    private void cleanUp(Context ctx) throws Exception {
+        // delete timer
+        Long timer = timerState.value();
+        ctx.timerService().deleteProcessingTimeTimer(timer);
+
+        // clean up all state
+        timerState.clear();
+        flagState.clear();
+    }
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.walkthrough.common.entity.Alert
+import org.apache.flink.walkthrough.common.entity.Transaction
+
+object FraudDetector {
+  val SMALL_AMOUNT: Double = 1.00
+  val LARGE_AMOUNT: Double = 500.00
+  val ONE_MINUTE: Long     = 60 * 1000L
+}
+
+@SerialVersionUID(1L)
+class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
+
+  @transient private var flagState: ValueState[java.lang.Boolean] = _
+  @transient private var timerState: ValueState[java.lang.Long] = _
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
+    flagState = getRuntimeContext.getState(flagDescriptor)
+
+    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
+    timerState = getRuntimeContext.getState(timerDescriptor)
+  }
+
+  override def processElement(
+      transaction: Transaction,
+      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
+      collector: Collector[Alert]): Unit = {
+
+    // Get the current state for the current key
+    val lastTransactionWasSmall = flagState.value
+
+    // Check if the flag is set
+    if (lastTransactionWasSmall != null) {
+      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
+        // Output an alert downstream
+        val alert = new Alert
+        alert.setId(transaction.getAccountId)
+
+        collector.collect(alert)
+      }
+      // Clean up our state
+      cleanUp(context)
+    }
+
+    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
+      // set the flag to true
+      flagState.update(true)
+      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
+
+      context.timerService.registerProcessingTimeTimer(timer)
+      timerState.update(timer)
+    }
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
+      out: Collector[Alert]): Unit = {
+    // remove flag after 1 minute
+    timerState.clear()
+    flagState.clear()
+  }
+
+  @throws[Exception]
+  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
+    // delete timer
+    val timer = timerState.value
+    ctx.timerService.deleteProcessingTimeTimer(timer)
+
+    // clean up all states
+    timerState.clear()
+    flagState.clear()
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 ### Expected Output
 
@@ -427,3 +897,9 @@ You should see the following output in your task manager logs:
 2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
 2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
 ```
+
+### Running in an IDE
+
+Running the project in an IDE may result in a `java.lang.NoClassDefFoundError` exception. This is probably because you do not have all required Flink dependencies implicitly loaded into the classpath.
+
+* IntelliJ IDEA: Go to Run > Edit Configurations > Modify options > Select `include dependencies with "Provided" scope`. This run configuration will now include all required classes to run the application from within the IDE.
\ No newline at end of file