You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/11/18 12:43:00 UTC
[jira] [Commented] (BAHIR-144) [FLINK-4520] Add Siddhi CEP
integration with Flink streaming
[ https://issues.apache.org/jira/browse/BAHIR-144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258052#comment-16258052 ]
ASF GitHub Bot commented on BAHIR-144:
--------------------------------------
GitHub user haoch opened a pull request:
https://github.com/apache/bahir/pull/54
[BAHIR-144] Add flink-streaming-siddhi
Moved from https://github.com/apache/flink/pull/2487 as suggested by Flink community.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/haoch/bahir streaming-siddhi
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/bahir/pull/54.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #54
----
commit 2a36c786f2962ec1af631efbaa2defc4d168a1c5
Author: Hao Chen <hc...@ebay.com>
Date: 2017-11-18T12:34:21Z
Add flink-streaming-siddhi
----
> [FLINK-4520] Add Siddhi CEP integration with Flink streaming
> ------------------------------------------------------------
>
> Key: BAHIR-144
> URL: https://issues.apache.org/jira/browse/BAHIR-144
> Project: Bahir
> Issue Type: New Feature
> Components: Flink Streaming Connectors
> Reporter: Hao Chen
>
> Moved from:
> * https://issues.apache.org/jira/browse/FLINK-4520
> * https://github.com/apache/flink/pull/2487
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries.
> **It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.**
> # Features
> - Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
> - Filter
> - Join
> - Aggregation
> - Group by
> - Having
> - Window
> - Conditions and Expressions
> - Pattern processing
> - Sequence processing
> - Event Tables
> ...
> - Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
> - Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> - Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan
> - Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema
> - Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
> - Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
> # Test Cases
> - [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)
> # Example
> ```
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
> cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
> cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
> cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
> DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
> .from("inputStream1").union("inputStream2")
> .sql(
> "from every s1 = inputStream1[id == 2] "
> + " -> s2 = inputStream2[id == 3] "
> + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
> + "insert into outputStream"
> )
> .returns("outputStream");
> env.execute();
> ```
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)