You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/06/09 07:17:18 UTC

[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

    [ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044079#comment-16044079 ] 

ASF GitHub Bot commented on FLINK-4520:
---------------------------------------

Github user asdf2014 commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    Hi, @haoch @rmetzger . Why i cannot find `flink-siddhi` in [bahir](https://github.com/apache/bahir) or [bahir-flink](https://github.com/apache/bahir-flink)? In addition, the [flink-siddhi](https://github.com/haoch/flink-siddhi) still depence on `flink v1.1.2`. May i ask how long will i could use these advanced features of `Siddhi CEP` on flink... 


> Integrate Siddhi as a lightweight CEP Library
> ---------------------------------------------
>
>                 Key: FLINK-4520
>                 URL: https://issues.apache.org/jira/browse/FLINK-4520
>             Project: Flink
>          Issue Type: New Feature
>          Components: CEP
>    Affects Versions: 1.2.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>              Labels: cep, library, patch-available
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
>     * Filter
>     * Join
>     * Aggregation
>     * Group by
>     * Having
>     * Window
>     * Conditions and Expressions
>     * Pattern processing
>     * Sequence processing
>     * Event Tables
>     ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
>     * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
>     * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan
>     * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
>  DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
>     "from every s1 = inputStream1[id == 2] "
>      + " -> s2 = inputStream2[id == 3] "
>      + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
>      + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)