You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/12/30 14:16:34 UTC

[flink] 07/08: [FLINK-10596][cep][docs] Updated cep docs with PatternProcessFunction

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b8edec2e833ed3216dddd2e386e77b2c2d50467
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:20:10 2018 +0100

    [FLINK-10596][cep][docs] Updated cep docs with PatternProcessFunction
---
 docs/dev/libs/cep.md | 158 ++++++++++++++++++++-------------------------------
 1 file changed, 62 insertions(+), 96 deletions(-)

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ce9a9c3..0519dbc 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -101,14 +101,16 @@ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
 
 PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
-DataStream<Alert> result = patternStream.select(
-    new PatternSelectFunction<Event, Alert>() {
+DataStream<Alert> result = patternStream.process(
+    new PatternProcessFunction<Event, Alert>() {
         @Override
-        public Alert select(Map<String, List<Event>> pattern) throws Exception {
-            return createAlertFrom(pattern);
+        public void processMatch(
+                Map<String, List<Event>> pattern,
+                Context ctx,
+                Collector<Alert> out) throws Exception {
+            out.collect(createAlertFrom(pattern));
         }
-    }
-});
+    });
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -121,7 +123,15 @@ val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
 
 val patternStream = CEP.pattern(input, pattern)
 
-val result: DataStream[Alert] = patternStream.select(createAlert(_))
+val result: DataStream[Alert] = patternStream.process(
+    new PatternProcessFunction[Event, Alert]() {
+        override def processMatch(
+              `match`: util.Map[String, util.List[Event]],
+              ctx: PatternProcessFunction.Context,
+              out: Collector[Alert]): Unit = {
+            out.collect(createAlertFrom(pattern))
+        }
+    })
 {% endhighlight %}
 </div>
 </div>
@@ -1477,92 +1487,61 @@ The input stream can be *keyed* or *non-keyed* depending on your use-case.
 
 ### Selecting from Patterns
 
-Once you have obtained a `PatternStream` you can select from detected event sequences via the `select` or `flatSelect` methods.
+Once you have obtained a `PatternStream` you can apply transformation to detected event sequences. The suggested way of doing that
+is by `PatternProcessFunction`.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-The `select()` method requires a `PatternSelectFunction` implementation.
-A `PatternSelectFunction` has a `select` method which is called for each matching event sequence.
+A `PatternProcessFunction` has a `processMatch` method which is called for each matching event sequence.
 It receives a match in the form of `Map<String, List<IN>>` where the key is the name of each pattern in your pattern
 sequence and the value is a list of all accepted events for that pattern (`IN` is the type of your input elements).
 The events for a given pattern are ordered by timestamp. The reason for returning a list of accepted events for each
-pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern. The selection function returns exactly one result.
+pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern.
 
 {% highlight java %}
-class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
+class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
     @Override
-    public OUT select(Map<String, List<IN>> pattern) {
-        IN startEvent = pattern.get("start").get(0);
-        IN endEvent = pattern.get("end").get(0);
-        return new OUT(startEvent, endEvent);
+    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
+        IN startEvent = match.get("start").get(0);
+        IN endEvent = match.get("end").get(0);
+        out.collect(OUT(startEvent, endEvent));
     }
 }
 {% endhighlight %}
 
-A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an
-arbitrary number of results. To do this, the `select` method has an additional `Collector` parameter which is
-used to forward your output elements downstream.
+The `PatternProcessFunction` gives access to a `Context` object. Thanks to it, one can access time related
+characteristics such as `currentProcessingTime` or `timestamp` of current match (which is the timestamp of the last element assigned to the match).
+Through this context one can also emit results to a [side-output]({{ site.baseurl }}/dev/stream/side_output.html).
 
-{% highlight java %}
-class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
-    @Override
-    public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
-        IN startEvent = pattern.get("start").get(0);
-        IN endEvent = pattern.get("end").get(0);
 
-        for (int i = 0; i < startEvent.getValue(); i++ ) {
-            collector.collect(new OUT(startEvent, endEvent));
-        }
-    }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-The `select()` method takes a selection function as argument, which is called for each matching event sequence.
-It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each pattern in your pattern
-sequence and the value is an Iterable over all accepted events for that pattern (`IN` is the type of your input elements).
+#### Handling Timed Out Partial Patterns
 
-The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern. The selection function returns exactly one result per call.
-
-{% highlight scala %}
-def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
-    val startEvent = pattern.get("start").get.head
-    val endEvent = pattern.get("end").get.head
-    OUT(startEvent, endEvent)
-}
-{% endhighlight %}
+Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
+are discarded because they exceed the window length. To act upon a timed out partial match one can use `TimedOutPartialMatchHandler` interface.
+The interface is supposed to be used in a mixin style. This mean you can additionally implement this interface with your `PatternProcessFunction`.
+The `TimedOutPartialMatchHandler` provides the additional `processTimedOutMatch` method which will be called for every timed out partial match.
 
-The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the
-`flatSelect` method can return an arbitrary number of results per call. In order to do this, the function for
-`flatSelect` has an additional `Collector` parameter which is used to forward your output elements downstream.
+{% highlight java %}
+class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
+    @Override
+    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
+        ...
+    }
 
-{% highlight scala %}
-def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
-    val startEvent = pattern.get("start").get.head
-    val endEvent = pattern.get("end").get.head
-    for (i <- 0 to startEvent.getValue) {
-        collector.collect(OUT(startEvent, endEvent))
+    @Override
+    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
+        IN startEvent = match.get("start").get(0);
+        ctx.output(outputTag, T(startEvent));
     }
 }
 {% endhighlight %}
-</div>
-</div>
 
-### Handling Timed Out Partial Patterns
+<span class="label label-info">Note</span> The `processTimedOutMatch` does not give one access to the main output. You can still emit results
+through [side-outputs]({{ site.baseurl }}/dev/stream/side_output.html) though, through the `Context` object.
 
-Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
-are discarded because they exceed the window length. To react to these timed out partial matches the `select`
-and `flatSelect` API calls allow you to specify a timeout handler. This timeout handler is called for each timed out
-partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and
-the timestamp when the timeout was detected.
 
-To treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as
-parameters
+#### Convenience API
 
- * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction`
- * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the side output in which the timed out matches will be returned
- * and the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+The aforementioned `PatternProcessFunction` was introduced in Flink 1.8 and since then it is the recommended way to interact with matches.
+One can still use the old style API like `select`/`flatSelect`, which internally will be translated into a `PatternProcessFunction`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1572,18 +1551,21 @@ PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
 OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
 
-SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
-    outputTag,
-    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
-    new PatternSelectFunction<Event, ComplexEvent>() {...}
-);
-
-DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
-
 SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
     outputTag,
-    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
-    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
+    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {
+        public void timeout(
+                Map<String, List<Event>> pattern,
+                long timeoutTimestamp,
+                Collector<TimeoutEvent> out) throws Exception {
+            out.collect(new TimeoutEvent());
+        }
+    },
+    new PatternFlatSelectFunction<Event, ComplexEvent>() {
+        public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
+            out.collect(new ComplexEvent());
+        }
+    }
 );
 
 DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
@@ -1594,23 +1576,7 @@ DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag)
 <div data-lang="scala" markdown="1">
 
 {% highlight scala %}
-val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
-
-val outputTag = OutputTag[String]("side-output")
-
-val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
-    (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
-} {
-    pattern: Map[String, Iterable[Event]] => ComplexEvent()
-}
 
-val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
-{% endhighlight %}
-
-The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
-In contrast to the `select` functions, the `flatSelect` functions are called with a `Collector`. You can use the collector to emit an arbitrary number of events.
-
-{% highlight scala %}
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
 val outputTag = OutputTag[String]("side-output")