You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/11/20 20:36:00 UTC

[jira] [Commented] (BAHIR-144) Add Siddhi CEP integration with Flink streaming

    [ https://issues.apache.org/jira/browse/BAHIR-144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259797#comment-16259797 ] 

ASF GitHub Bot commented on BAHIR-144:
--------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/22#discussion_r152103730
  
    --- Diff: flink-library-siddhi/pom.xml ---
    @@ -0,0 +1,113 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  ~ Licensed to the Apache Software Foundation (ASF) under one or more
    +  ~ contributor license agreements.  See the NOTICE file distributed with
    +  ~ this work for additional information regarding copyright ownership.
    +  ~ The ASF licenses this file to You under the Apache License, Version 2.0
    +  ~ (the "License"); you may not use this file except in compliance with
    +  ~ the License.  You may obtain a copy of the License at
    +  ~
    +  ~    http://www.apache.org/licenses/LICENSE-2.0
    +  ~
    +  ~ Unless required by applicable law or agreed to in writing, software
    +  ~ distributed under the License is distributed on an "AS IS" BASIS,
    +  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  ~ See the License for the specific language governing permissions and
    +  ~ limitations under the License.
    +  -->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <parent>
    +        <groupId>org.apache.bahir</groupId>
    +        <artifactId>bahir-flink-parent_2.11</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <artifactId>flink-library-siddhi_2.11</artifactId>
    +    <name>flink-library-siddhi</name>
    +
    +    <packaging>jar</packaging>
    +
    +    <properties>
    +        <siddhi.version>4.0.0-M120</siddhi.version>
    +    </properties>
    +
    +    <dependencies>
    +        <!-- core dependencies -->
    +        <dependency>
    +            <groupId>org.wso2.siddhi</groupId>
    +            <artifactId>siddhi-core</artifactId>
    +            <version>${siddhi.version}</version>
    +            <exclusions>
    +                <exclusion>  <!-- declare the exclusion here -->
    +                    <groupId>org.apache.directory.jdbm</groupId>
    +                    <artifactId>apacheds-jdbm1</artifactId>
    +                </exclusion>
    +            </exclusions>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.wso2.siddhi</groupId>
    +            <artifactId>siddhi-query-api</artifactId>
    --- End diff --
    
    Dependency is Apache 2.0 licensed --> good.


> Add Siddhi CEP integration with Flink streaming
> -----------------------------------------------
>
>                 Key: BAHIR-144
>                 URL: https://issues.apache.org/jira/browse/BAHIR-144
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Flink Streaming Connectors
>            Reporter: Hao Chen
>
> Moved from:
> * https://issues.apache.org/jira/browse/FLINK-4520
> * https://github.com/apache/flink/pull/2487
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. 
> **It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.**
> # Features
> - Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
>   - Filter
>   - Join
>   - Aggregation
>   - Group by
>   - Having
>   - Window
>   - Conditions and Expressions
>   - Pattern processing
>   - Sequence processing
>   - Event Tables
>     ...
> - Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
>   - Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
>   - Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan
>   - Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema
> - Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
> - Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
> # Test Cases
> - [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)
> # Example
> ```
>  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
>  DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
>     .from("inputStream1").union("inputStream2")
>     .sql(
>         "from every s1 = inputStream1[id == 2] "
>          + " -> s2 = inputStream2[id == 3] "
>          + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
>          + "insert into outputStream"
>     )
>     .returns("outputStream");
>  env.execute();
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)