You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/01/28 20:19:39 UTC
[jira] [Commented] (FLINK-3216) Define pattern specification
[ https://issues.apache.org/jira/browse/FLINK-3216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122144#comment-15122144 ]
ASF GitHub Bot commented on FLINK-3216:
---------------------------------------
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/1557
[FLINK-3216] [FLINK-3217] [cep] Initial version of CEP library
This PR is the first version of Flink's CEP library.
The key components are the `NFA` which uses the `SharedBuffer` to efficiently maintain the state multiple non-deterministic runs. The `NFA` implementation is strongly based on this [paper](https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf).
In order to define the `NFA`, the library supports the pattern API. The pattern API let's you easily construct complex patterns with type and filter conditions. The specified pattern is then compiled into a `NFA` which is responsible for detecting the patterns. See the online documentation for a full specification of the supported operations (docs/libs/cep/index.md).
In order to run the `NFA`, the library adds two custom stream operators: `CEPPatternOperator` and `KeyedCEPPatternOperator`. The former is used for non-keyed input streams and the latter for keyed data streams. The selection of the right operator is transparently done vie the `CEP.from(input, pattern)` method.
`CEP.from(input.pattern)` returns a `PatternStream` which contains the matched event sequences. The event sequences can be processed by specifying a `PatternSelectFunction` or a `PatternFlatSelectFunction`. Both methods receive the detected pattern as a `Map<String, T>` where `T` is the type of the input data stream. Each event is matched against a state of the pattern and the name of the state is the key of the map.
An example of the API can be seen next:
```
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...
DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
});
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("name");
}
}).followedBy("end").where(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.from(partitionedInput, pattern);
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, Event> pattern) throws Exception {
return new Alert(pattern.get("start"), pattern.get("end"))
}
});
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink cep
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1557.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1557
----
commit 5deda066c5f4851a426b15a05994be0796f5e6f3
Author: Till Rohrmann <tr...@apache.org>
Date: 2016-01-14T09:04:23Z
[FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
Implements NFA using the SharedBuffer
Implements NFACompiler to compile a Pattern into a NFA
Add CEP operator
Makes NFA and SharedBuffer serializable
Add serializability support to SharedBuffer and NFA
Add keyed cep pattern operator
commit 6459946a62985e08c50ca397447af42662ec6558
Author: Till Rohrmann <tr...@apache.org>
Date: 2016-01-28T15:41:01Z
Adds CEP documentation
Adds online documentation for the CEP library
----
> Define pattern specification
> ----------------------------
>
> Key: FLINK-3216
> URL: https://issues.apache.org/jira/browse/FLINK-3216
> Project: Flink
> Issue Type: Sub-task
> Reporter: Till Rohrmann
>
> In order to detect event patterns we first have to define the pattern. This issue tracks the progress of implementing a user facing API to define event patterns.
> Patterns should support the following operations
> * next(): The given event has to follow directly after the preceding event
> followedBy(): The given event has to follow the preceding event. There might occur other events in-between
> * every(): In a follow-by relationship a starting event can be matched with multiple successive events. Consider the pattern a → b where → denotes the follow-by relationship. The event sequence a, b, b can be matched as a, b or a, (b), b where the first b is left out. The essential question is whether a is allowed to match multiple times or only the first time. The method every specifies exactly that. Every events in a pattern can match with multiple successive events. This makes only sense in a follow-by relationship, though.
> * followedByEvery(): Similar to followedBy just that the specified element can be matched with multiple successive events
> * or(): Alternative event which can be matched instead of the original event: every(“e1”).where().or(“e2”).where()
> * within(): Defines a time interval in which the pattern has to be completed, otherwise an incomplete pattern can be emitted (timeout case)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)