You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "Hao Chen (JIRA)" <ji...@apache.org> on 2017/11/18 12:39:00 UTC

[jira] [Created] (BAHIR-144) [FLINK-4520] Add Siddhi CEP integration with Flink streaming

Hao Chen created BAHIR-144:
------------------------------

             Summary: [FLINK-4520] Add Siddhi CEP integration with Flink streaming
                 Key: BAHIR-144
                 URL: https://issues.apache.org/jira/browse/BAHIR-144
             Project: Bahir
          Issue Type: New Feature
          Components: Flink Streaming Connectors
            Reporter: Hao Chen


Moved from:

* https://issues.apache.org/jira/browse/FLINK-4520
* https://github.com/apache/flink/pull/2487

Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. 

**It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.**
# Features
- Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
  - Filter
  - Join
  - Aggregation
  - Group by
  - Having
  - Window
  - Conditions and Expressions
  - Pattern processing
  - Sequence processing
  - Event Tables
    ...
- Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
  - Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
  - Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan
  - Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema
- Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
- Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
# Test Cases
- [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)
# Example

```
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
 cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");

 DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
    .from("inputStream1").union("inputStream2")
    .sql(
        "from every s1 = inputStream1[id == 2] "
         + " -> s2 = inputStream2[id == 3] "
         + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
         + "insert into outputStream"
    )
    .returns("outputStream");

 env.execute();
```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)