You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/12/30 14:16:34 UTC
[flink] 07/08: [FLINK-10596][cep][docs] Updated cep docs with
PatternProcessFunction
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2b8edec2e833ed3216dddd2e386e77b2c2d50467
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:20:10 2018 +0100
[FLINK-10596][cep][docs] Updated cep docs with PatternProcessFunction
---
docs/dev/libs/cep.md | 158 ++++++++++++++++++++-------------------------------
1 file changed, 62 insertions(+), 96 deletions(-)
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ce9a9c3..0519dbc 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -101,14 +101,16 @@ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
-DataStream<Alert> result = patternStream.select(
- new PatternSelectFunction<Event, Alert>() {
+DataStream<Alert> result = patternStream.process(
+ new PatternProcessFunction<Event, Alert>() {
@Override
- public Alert select(Map<String, List<Event>> pattern) throws Exception {
- return createAlertFrom(pattern);
+ public void processMatch(
+ Map<String, List<Event>> pattern,
+ Context ctx,
+ Collector<Alert> out) throws Exception {
+ out.collect(createAlertFrom(pattern));
}
- }
-});
+ });
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
@@ -121,7 +123,15 @@ val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
val patternStream = CEP.pattern(input, pattern)
-val result: DataStream[Alert] = patternStream.select(createAlert(_))
+val result: DataStream[Alert] = patternStream.process(
+ new PatternProcessFunction[Event, Alert]() {
+ override def processMatch(
+ `match`: util.Map[String, util.List[Event]],
+ ctx: PatternProcessFunction.Context,
+ out: Collector[Alert]): Unit = {
+ out.collect(createAlertFrom(pattern))
+ }
+ })
{% endhighlight %}
</div>
</div>
@@ -1477,92 +1487,61 @@ The input stream can be *keyed* or *non-keyed* depending on your use-case.
### Selecting from Patterns
-Once you have obtained a `PatternStream` you can select from detected event sequences via the `select` or `flatSelect` methods.
+Once you have obtained a `PatternStream` you can apply transformation to detected event sequences. The suggested way of doing that
+is by `PatternProcessFunction`.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-The `select()` method requires a `PatternSelectFunction` implementation.
-A `PatternSelectFunction` has a `select` method which is called for each matching event sequence.
+A `PatternProcessFunction` has a `processMatch` method which is called for each matching event sequence.
It receives a match in the form of `Map<String, List<IN>>` where the key is the name of each pattern in your pattern
sequence and the value is a list of all accepted events for that pattern (`IN` is the type of your input elements).
The events for a given pattern are ordered by timestamp. The reason for returning a list of accepted events for each
-pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern. The selection function returns exactly one result.
+pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern.
{% highlight java %}
-class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
+class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
@Override
- public OUT select(Map<String, List<IN>> pattern) {
- IN startEvent = pattern.get("start").get(0);
- IN endEvent = pattern.get("end").get(0);
- return new OUT(startEvent, endEvent);
+ public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
+ IN startEvent = match.get("start").get(0);
+ IN endEvent = match.get("end").get(0);
+ out.collect(OUT(startEvent, endEvent));
}
}
{% endhighlight %}
-A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an
-arbitrary number of results. To do this, the `select` method has an additional `Collector` parameter which is
-used to forward your output elements downstream.
+The `PatternProcessFunction` gives access to a `Context` object. Thanks to it, one can access time related
+characteristics such as `currentProcessingTime` or `timestamp` of current match (which is the timestamp of the last element assigned to the match).
+Through this context one can also emit results to a [side-output]({{ site.baseurl }}/dev/stream/side_output.html).
-{% highlight java %}
-class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
- @Override
- public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
- IN startEvent = pattern.get("start").get(0);
- IN endEvent = pattern.get("end").get(0);
- for (int i = 0; i < startEvent.getValue(); i++ ) {
- collector.collect(new OUT(startEvent, endEvent));
- }
- }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-The `select()` method takes a selection function as argument, which is called for each matching event sequence.
-It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each pattern in your pattern
-sequence and the value is an Iterable over all accepted events for that pattern (`IN` is the type of your input elements).
+#### Handling Timed Out Partial Patterns
-The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern. The selection function returns exactly one result per call.
-
-{% highlight scala %}
-def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
- val startEvent = pattern.get("start").get.head
- val endEvent = pattern.get("end").get.head
- OUT(startEvent, endEvent)
-}
-{% endhighlight %}
+Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
+are discarded because they exceed the window length. To act upon a timed out partial match one can use `TimedOutPartialMatchHandler` interface.
+The interface is supposed to be used in a mixin style. This mean you can additionally implement this interface with your `PatternProcessFunction`.
+The `TimedOutPartialMatchHandler` provides the additional `processTimedOutMatch` method which will be called for every timed out partial match.
-The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the
-`flatSelect` method can return an arbitrary number of results per call. In order to do this, the function for
-`flatSelect` has an additional `Collector` parameter which is used to forward your output elements downstream.
+{% highlight java %}
+class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
+ @Override
+ public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
+ ...
+ }
-{% highlight scala %}
-def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
- val startEvent = pattern.get("start").get.head
- val endEvent = pattern.get("end").get.head
- for (i <- 0 to startEvent.getValue) {
- collector.collect(OUT(startEvent, endEvent))
+ @Override
+ public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
+ IN startEvent = match.get("start").get(0);
+ ctx.output(outputTag, T(startEvent));
}
}
{% endhighlight %}
-</div>
-</div>
-### Handling Timed Out Partial Patterns
+<span class="label label-info">Note</span> The `processTimedOutMatch` does not give one access to the main output. You can still emit results
+through [side-outputs]({{ site.baseurl }}/dev/stream/side_output.html) though, through the `Context` object.
-Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
-are discarded because they exceed the window length. To react to these timed out partial matches the `select`
-and `flatSelect` API calls allow you to specify a timeout handler. This timeout handler is called for each timed out
-partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and
-the timestamp when the timeout was detected.
-To treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as
-parameters
+#### Convenience API
- * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction`
- * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the side output in which the timed out matches will be returned
- * and the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+The aforementioned `PatternProcessFunction` was introduced in Flink 1.8 and since then it is the recommended way to interact with matches.
+One can still use the old style API like `select`/`flatSelect`, which internally will be translated into a `PatternProcessFunction`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -1572,18 +1551,21 @@ PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
-SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
- outputTag,
- new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
- new PatternSelectFunction<Event, ComplexEvent>() {...}
-);
-
-DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
-
SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
outputTag,
- new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
- new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
+ new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {
+ public void timeout(
+ Map<String, List<Event>> pattern,
+ long timeoutTimestamp,
+ Collector<TimeoutEvent> out) throws Exception {
+ out.collect(new TimeoutEvent());
+ }
+ },
+ new PatternFlatSelectFunction<Event, ComplexEvent>() {
+ public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
+ out.collect(new ComplexEvent());
+ }
+ }
);
DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
@@ -1594,23 +1576,7 @@ DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag)
<div data-lang="scala" markdown="1">
{% highlight scala %}
-val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
-
-val outputTag = OutputTag[String]("side-output")
-
-val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
- (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
-} {
- pattern: Map[String, Iterable[Event]] => ComplexEvent()
-}
-val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
-{% endhighlight %}
-
-The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
-In contrast to the `select` functions, the `flatSelect` functions are called with a `Collector`. You can use the collector to emit an arbitrary number of events.
-
-{% highlight scala %}
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val outputTag = OutputTag[String]("side-output")