You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/01/28 20:19:39 UTC

[jira] [Commented] (FLINK-3216) Define pattern specification

    [ https://issues.apache.org/jira/browse/FLINK-3216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122144#comment-15122144 ] 

ASF GitHub Bot commented on FLINK-3216:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1557

    [FLINK-3216] [FLINK-3217] [cep] Initial version of CEP library

    This PR is the first version of Flink's CEP library.
    
    The key components are the `NFA` which uses the `SharedBuffer` to efficiently maintain the state multiple non-deterministic runs. The `NFA` implementation is strongly based on this [paper](https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf).
    
    In order to define the `NFA`, the library supports the pattern API. The pattern API let's you easily construct complex patterns with type and filter conditions. The specified pattern is then compiled into a `NFA` which is responsible for detecting the patterns. See the online documentation for a full specification of the supported operations (docs/libs/cep/index.md).
    
    In order to run the `NFA`, the library adds two custom stream operators: `CEPPatternOperator` and `KeyedCEPPatternOperator`. The former is used for non-keyed input streams and the latter for keyed data streams. The selection of the right operator is transparently done vie the `CEP.from(input, pattern)` method.
    
    `CEP.from(input.pattern)` returns a `PatternStream` which contains the matched event sequences. The event sequences can be processed by specifying a `PatternSelectFunction` or a `PatternFlatSelectFunction`. Both methods receive the detected pattern as a `Map<String, T>` where `T` is the type of the input data stream. Each event is matched against a state of the pattern and the name of the state is the key of the map.
    
    An example of the API can be seen next:
    
    ```
    StreamExecutionEnvironment env = ...
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    DataStream<Event> input = ...
    
    DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
    	@Override
    	public Integer getKey(Event value) throws Exception {
    		return value.getId();
    	}
    });
    
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    	.next("middle").where(new FilterFunction<Event>() {
    		@Override
    		public boolean filter(Event value) throws Exception {
    			return value.getName().equals("name");
    		}
    	}).followedBy("end").where(new FilterFunction<Event>() {
    		@Override
    		public boolean filter(Event value) throws Exception {
    			return value.getName().equals("critical");
    		}
    	}).within(Time.seconds(10));
    
    PatternStream<Event> patternStream = CEP.from(partitionedInput, pattern);
    
    DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
    	@Override
    	public Alert select(Map<String, Event> pattern) throws Exception {
    		return new Alert(pattern.get("start"), pattern.get("end"))
    	}
    });
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink cep

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1557.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1557
    
----
commit 5deda066c5f4851a426b15a05994be0796f5e6f3
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-01-14T09:04:23Z

    [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
    
    Implements NFA using the SharedBuffer
    
    Implements NFACompiler to compile a Pattern into a NFA
    
    Add CEP operator
    
    Makes NFA and SharedBuffer serializable
    
    Add serializability support to SharedBuffer and NFA
    
    Add keyed cep pattern operator

commit 6459946a62985e08c50ca397447af42662ec6558
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-01-28T15:41:01Z

    Adds CEP documentation
    
    Adds online documentation for the CEP library

----


> Define pattern specification
> ----------------------------
>
>                 Key: FLINK-3216
>                 URL: https://issues.apache.org/jira/browse/FLINK-3216
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Till Rohrmann
>
> In order to detect event patterns we first have to define the pattern. This issue tracks the progress of implementing a user facing API to define event patterns. 
> Patterns should support the following operations
> * next(): The given event has to follow directly after the preceding event
> followedBy(): The given event has to follow the preceding event. There might occur other events in-between
> * every(): In a follow-by relationship a starting event can be matched with multiple successive events. Consider the pattern a → b where → denotes the follow-by relationship. The event sequence a, b, b can be matched as a, b or a, (b), b where the first b is left out. The essential question is whether a is allowed to match multiple times or only the first time. The method every specifies exactly that. Every events in a pattern can match with multiple successive events. This makes only sense in a follow-by relationship, though.
> * followedByEvery(): Similar to followedBy just that the specified element can be matched with multiple successive events
> * or(): Alternative event which can be matched instead of the original event: every(“e1”).where().or(“e2”).where()
> * within(): Defines a time interval in which the pattern has to be completed, otherwise an incomplete pattern can be emitted (timeout case)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)