You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by haoch <gi...@git.apache.org> on 2016/09/10 02:29:49 UTC

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

GitHub user haoch opened a pull request:

    https://github.com/apache/flink/pull/2487

    [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-weight Streaming CEP Library

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    # Abstraction
    Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. 
    
    __It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.__
    
    # Features
    
    * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
      * Filter
      * Join
      * Aggregation
      * Group by
      * Having
      * Window
      * Conditions and Expressions
      * Pattern processing
      * Sequence processing
      * Event Tables
      ...
    * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
      * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
      * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan
      * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema
    * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
    * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
    
    # Test Cases 
    
    * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)
    
    # Example
    
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
        
         cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
        
         cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
         cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
        
         DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
         	.from("inputStream1").union("inputStream2")
         	.sql(
        		"from every s1 = inputStream1[id == 2] "
        		 + " -> s2 = inputStream2[id == 3] "
        		 + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
        		 + "insert into outputStream"
        	)
        	.returns("outputStream");
        
         env.execute();


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/haoch/flink FLINK-4520

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2487
    
----
commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0
Author: Hao Chen <hc...@ebay.com>
Date:   2016-08-29T12:34:42Z

    Implement initial version of flink-siddhi stream operator

commit d8b131d9204e9e359afae80087afe2aecab27eaf
Author: Chen, Hao <ha...@apache.org>
Date:   2016-08-30T14:33:59Z

    Implement SiddhiStream API and DSL

commit 56c150fadc54d06c186223e43a6fd9ac74eee837
Author: Chen, Hao <ha...@apache.org>
Date:   2016-08-30T18:31:18Z

    Reformat code following checkstyle

commit 138b0833715a7d110ddb4562e83fb6f0d5338fea
Author: Chen, Hao <ha...@apache.org>
Date:   2016-08-31T17:41:52Z

    Add SiddhiTypeUtils and support stream as typed tuple

commit 2484c998bd4def5330012bd6797fb807939752b7
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-07T17:10:03Z

    Support siddhi stream union

commit 0a12f7d3ddad1d0fa7a216524212f777d722a3f7
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-07T17:40:31Z

    Support siddhi extension and registery

commit 9dc425960b212e426e50683778aa3939492669e2
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-07T18:02:55Z

    Fix import checkstyle

commit e618bb2774bd0d6af77d11f65c2200cf18253d1f
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-08T15:29:45Z

    Refactor registerExtension interface

commit 13d76b6c16ca538ff8113352310f9568035eb92a
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-08T17:55:41Z

    Decouple SiddhiCEP API to environment instance isolation level

commit 26498dee5fefd6520701f6ee23441968642656a8
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-08T18:00:19Z

    Add unit test testTriggerUndefinedStreamException

commit 23dabddf1f20665161861c6757c48c5ae0b0bb08
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-08T18:22:18Z

    Decouple data stream converter with streamId

commit 8bbf6f3417e0c177638301c18379c8aadc3746ee
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-09T08:16:51Z

    Sorting buffer queue state persistence and restore logic, support return Map<String,Object>

commit fcb4a2c85abd604965f2e375d767f6dbd2711222
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-09T09:28:24Z

    Fix checkstyle error

commit 466aafca86899c89b7b6e6d25f844880f9513c51
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-09T17:34:11Z

    Reuse StreamRecord object while collecting output

commit f7f9289557b1da3304ea3d43f7d2441ff314a755
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-09T18:34:30Z

    Add flink-sddhi doc in package-info.java

commit ef558ce668b8403505afbaad01bfafa475e60037
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-10T02:21:22Z

    Merge branch 'master' of https://github.com/apache/flink into FLINK-4520-fix

commit 4cd3ea7646ecdc5c22fa85ae9953675b9448b044
Author: Chen, Hao <ha...@apache.org>
Date:   2016-09-10T02:26:53Z

    Refactor siddhi stater snapshot/restore according to new operator state interface

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78447224
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java ---
    @@ -0,0 +1,265 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi.operator;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.StreamSchema;
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.Output;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTask;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.wso2.siddhi.core.ExecutionPlanRuntime;
    +import org.wso2.siddhi.core.SiddhiManager;
    +import org.wso2.siddhi.core.stream.input.InputHandler;
    +import org.wso2.siddhi.query.api.definition.AbstractDefinition;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.PriorityQueue;
    +
    +public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
    +	private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
    +	private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    +
    +	private final SiddhiOperatorContext siddhiPlan;
    +	private final String executionExpression;
    +	private final boolean isProcessingTime;
    +	private final Map<String, MultiplexingStreamRecordSerializer<IN>> streamRecordSerializers;
    +
    +	private transient SiddhiManager siddhiManager;
    +	private transient ExecutionPlanRuntime siddhiRuntime;
    +	private transient Map<String, InputHandler> inputStreamHandlers;
    +
    +	// queue to buffer out of order stream records
    +	private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
    +
    +	/**
    +	 * @param siddhiPlan Siddhi CEP  Execution Plan
    +	 */
    +	public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
    +		validate(siddhiPlan);
    +		this.executionExpression = siddhiPlan.getFinalExecutionPlan();
    +		this.siddhiPlan = siddhiPlan;
    +		this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
    +		this.streamRecordSerializers = new HashMap<>();
    +
    +		for (String streamId : this.siddhiPlan.getInputStreams()) {
    +			streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig()));
    +		}
    +	}
    +
    +	protected abstract MultiplexingStreamRecordSerializer<IN> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig);
    +
    +	protected MultiplexingStreamRecordSerializer<IN> getStreamRecordSerializer(String streamId) {
    +		if (streamRecordSerializers.containsKey(streamId)) {
    +			return streamRecordSerializers.get(streamId);
    +		} else {
    +			throw new UndefinedStreamException("Stream " + streamId + " not defined");
    +		}
    +	}
    +
    +	@Override
    +	public void processElement(StreamRecord<IN> element) throws Exception {
    --- End diff --
    
    This code seems to be similar to the code from CEP library. Can we reuse it somehow?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @StephanEwen thanks for the comments, I think it's both ok to keep this in the core or as an separated project, but the concern is it maybe better for community development to centralize qualified libraries togather. As an alternative solution for too test stability and dead code, may it possible to create another code repository say "flink-library"?
    
    **BTW: here are the answers to your questions one by one:**
    
    > How complete is the implementation?
    
    Siddhi is a rich-featured CEP and has its own community, and maybe almost the only open source CEP solutions compatible with Apache License. And this library `flink-siddhi` is mainly focused on bring siddhi's capability to flink users seamlessly by:
    
    - Integrate Siddhi CEP runtime with flink lifecycle
    - Schema and DataStream source mapping
    - State management and fault-tolerant.
    
    So I think it would be extremely light-weight but useful, and the current implementation should be almost completed. 
    
    > Would you be up for maintaining this code?
    
    Sure, first of all, personally I am very willing to keep continuously contributing to Flink project in any way.  
    
    And also we used siddhi with distributed streaming system a lot in production, and currently considering to support flink as well under consideration of better state management and window supporting. So I would continuously maintain the code if merged, it not, I would maintain at https://github.com/haoch/flink-siddhi as well to make sure it's workable.
    
    > Are you building this as an experiment, or building a production use case based on Siddhi on Flink?
    
    We use siddhi with streaming environment in production a lot, currently supports storm and spark streaming, and also consider extending to Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    That all sounds very good.
    I would personally like it if this contribution would enter Flink in some way.
    
    There have been thoughts and discussions one in a while about creating a dedicated sub-projects for libraries/extensions like this, or at least a dedicated repository under Flink. I think this would be a great opportunity to revive those discussions.
    
    Let me start a thread on the mailing list.
    
    @haoch I hope you are okay with waiting for a few days for that discussion to come to a conclusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    Hi @haoch,
    
    I think it would be beneficial if you write a few words describing your design or add more JavaDocs. This would make the review process more straightforward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    Keep this PR open until https://github.com/apache/bahir-flink/pull/22 merged.


---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    Thank you for that big contribution. Siddhi looks like a cool approach to CEP.
    
    Before digging into the details, I would like to start a discussion about whether we should have this as a part of the core Flink repository, as a subproject, or if it would be best to have it initially as an external project.
    
    The reason is that that Flink repository is becoming a bit big right now. Build times are very long, test stability hard to manage, and there is quite a bit of "dead" code that was contributed by someone at some point but seems rarely used and is not maintained by the contributors.
    
    To help have a good discussion, it would be great to learn a bit more:
      - How complete is the implementation? 
      - Would you be up for maintaining this code?
      - Are you building this as an experiment, or building a production use case based on Siddhi on Flink?
    
    Thanks,
    Stephan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @haoch What do you think about Robert's suggestion to move this to Bahir? Seems like a reasonable first step to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78447005
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java ---
    @@ -0,0 +1,265 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi.operator;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.StreamSchema;
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.Output;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTask;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.wso2.siddhi.core.ExecutionPlanRuntime;
    +import org.wso2.siddhi.core.SiddhiManager;
    +import org.wso2.siddhi.core.stream.input.InputHandler;
    +import org.wso2.siddhi.query.api.definition.AbstractDefinition;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.PriorityQueue;
    +
    +public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
    +	private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
    +	private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    +
    +	private final SiddhiOperatorContext siddhiPlan;
    +	private final String executionExpression;
    +	private final boolean isProcessingTime;
    +	private final Map<String, MultiplexingStreamRecordSerializer<IN>> streamRecordSerializers;
    +
    +	private transient SiddhiManager siddhiManager;
    +	private transient ExecutionPlanRuntime siddhiRuntime;
    +	private transient Map<String, InputHandler> inputStreamHandlers;
    +
    +	// queue to buffer out of order stream records
    +	private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
    +
    +	/**
    +	 * @param siddhiPlan Siddhi CEP  Execution Plan
    +	 */
    +	public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
    +		validate(siddhiPlan);
    +		this.executionExpression = siddhiPlan.getFinalExecutionPlan();
    +		this.siddhiPlan = siddhiPlan;
    +		this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
    +		this.streamRecordSerializers = new HashMap<>();
    +
    +		for (String streamId : this.siddhiPlan.getInputStreams()) {
    --- End diff --
    
    This can be moved into a separate method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @asdf2014 sorry for delayed response, will find some time to finalize this PR and propose to Bahir project .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by asdf2014 <gi...@git.apache.org>.
Github user asdf2014 commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @rmetzger Alright. Thank you for your asking, but i think our company still plan to use `Flink-CEP` and contributing `Bahir-Siddhi` feature is a huge job, so... I'm so sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    As mentioned above, moved the patch to Apache Bahir at https://github.com/apache/bahir/pull/54, resolve this one.


---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by haoch <gi...@git.apache.org>.
Github user haoch closed the pull request at:

    https://github.com/apache/flink/pull/2487


---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by apivovarov <gi...@git.apache.org>.
Github user apivovarov commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78270750
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Siddhi CEP Execution Environment
    + */
    +@PublicEvolving
    +public class SiddhiCEP {
    +	private final StreamExecutionEnvironment executionEnvironment;
    +	private final Map<String, DataStream<?>> dataStreams;
    +	private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas;
    +	private final Map<String,Class<?>> extensions = new HashMap<>();
    +
    +	public Map<String, DataStream<?>> getDataStreams(){
    +		return this.dataStreams;
    +	}
    +
    +	public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas(){
    +		return this.dataStreamSchemas;
    +	}
    +
    +	public boolean isStreamDefined(String streamId){
    +		return dataStreams.containsKey(streamId);
    +	}
    +
    +	public Map<String,Class<?>> getExtensions(){
    +		return this.extensions;
    +	}
    +
    +	public void checkStreamDefined(String streamId) throws UndefinedStreamException {
    +		if(!isStreamDefined(streamId)){
    +			throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined");
    +		}
    +	}
    +
    +	public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
    +		this.executionEnvironment = streamExecutionEnvironment;
    +		this.dataStreams = new HashMap<>();
    +		this.dataStreamSchemas = new HashMap<>();
    --- End diff --
    
    lines 64 and 65 can be removed.
    Add `= new HashMap<>();` to lines 36 and 37 similar as it was done on line 38


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    Finally, the PR merged into https://github.com/apache/bahir-flink/pull/22, thanks everyone for the review!


---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @apivovarov thanks very much for the comments. I have formatted all code as required. Pls. kindly help continue reviewing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78446460
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Siddhi CEP Execution Environment
    + */
    +@PublicEvolving
    +public class SiddhiCEP {
    +	private final StreamExecutionEnvironment executionEnvironment;
    +	private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
    +	private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
    +	private final Map<String, Class<?>> extensions = new HashMap<>();
    +
    +	public Map<String, DataStream<?>> getDataStreams() {
    +		return this.dataStreams;
    +	}
    +
    +	public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
    +		return this.dataStreamSchemas;
    +	}
    +
    +	public boolean isStreamDefined(String streamId) {
    +		return dataStreams.containsKey(streamId);
    +	}
    +
    +	public Map<String, Class<?>> getExtensions() {
    +		return this.extensions;
    +	}
    +
    +	public void checkStreamDefined(String streamId) throws UndefinedStreamException {
    +		if (!isStreamDefined(streamId)) {
    +			throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
    +		}
    +	}
    +
    +	public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
    +		this.executionEnvironment = streamExecutionEnvironment;
    --- End diff --
    
    I would suggest to use `Preconditions` class to check the input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    Hi @haoch, thanks alot for this contribution.
    
    I recently started moving some of the streaming connectors of Flink to Apache Bahir, a community for extensions to Spark, Flink (and maybe others).
    You wrote in an earlier comment:
    
    >  I think it's both ok to keep this in the core or as an separated project, but the concern is it maybe better for community development to centralize qualified libraries togather.
    
    I think Bahir is addressing this issue nicely. So far we added only streaming connectors to Bahir, but I would like to see libraries and other things build on top of Flink there as well.
    I'm a committer at Bahir and can help you to get the code in there.
    The Bahir repository is located here https://github.com/apache/bahir-flink
    
    By the way, the tests you've added are failing on our CI system. Can you look into it? https://s3.amazonaws.com/archive.travis-ci.org/jobs/166483919/log.txt


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @asdf2014 We're currently working on integrating Flink Table & SQL API with CEP. It will add more capacity to the Flink cep library and also makes it more easily to use. An initial design doc can be got [here](https://docs.google.com/document/d/1HaaO5eYI1VZjyhtVPZOi3jVzikU7iK15H0YbniTnN30/edit#). The prototype and detailed design doc will come out very soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by apivovarov <gi...@git.apache.org>.
Github user apivovarov commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78270764
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Siddhi CEP Execution Environment
    + */
    +@PublicEvolving
    +public class SiddhiCEP {
    +	private final StreamExecutionEnvironment executionEnvironment;
    +	private final Map<String, DataStream<?>> dataStreams;
    +	private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas;
    +	private final Map<String,Class<?>> extensions = new HashMap<>();
    +
    +	public Map<String, DataStream<?>> getDataStreams(){
    +		return this.dataStreams;
    +	}
    +
    +	public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas(){
    +		return this.dataStreamSchemas;
    +	}
    +
    +	public boolean isStreamDefined(String streamId){
    +		return dataStreams.containsKey(streamId);
    +	}
    +
    +	public Map<String,Class<?>> getExtensions(){
    +		return this.extensions;
    +	}
    +
    +	public void checkStreamDefined(String streamId) throws UndefinedStreamException {
    +		if(!isStreamDefined(streamId)){
    +			throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined");
    +		}
    +	}
    +
    +	public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
    +		this.executionEnvironment = streamExecutionEnvironment;
    +		this.dataStreams = new HashMap<>();
    +		this.dataStreamSchemas = new HashMap<>();
    +	}
    +
    +	public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> inStream, String... fieldNames) {
    +		SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
    +		return environment.from(streamId,inStream,fieldNames);
    +	}
    +
    +	public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> inStream, String... fieldNames){
    +		this.registerStream(streamId,inStream,fieldNames);
    +		return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
    +	}
    +
    +	public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId){
    +		return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
    +	}
    +
    +	public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId,String ... unionStreamIds){
    +		return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId,this).union(unionStreamIds);
    +	}
    +
    +	public  <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
    +		if (isStreamDefined(streamId)) {
    +			throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
    +		}
    +		dataStreams.put(streamId, dataStream);
    +		SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
    +		schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
    +		dataStreamSchemas.put(streamId, schema);
    +	}
    +
    +	public StreamExecutionEnvironment getExecutionEnvironment() {
    +		return executionEnvironment;
    +	}
    +
    +	public void registerExtension(String extensionName, Class<?> extensionClass) {
    +		if(extensions.containsKey(extensionName)){
    +			throw new IllegalArgumentException("Extension named "+extensionName+" already registered");
    +		}
    +		extensions.put(extensionName,extensionClass);
    +	}
    +
    +	public <T> DataStream<T> getDataStream(String streamId) {
    +		if(this.dataStreams.containsKey(streamId)){
    +			return (DataStream<T>) this.dataStreams.get(streamId);
    +		}else{
    +			throw new UndefinedStreamException("Undefined stream "+streamId);
    +		}
    --- End diff --
    
    block 108-112 is not formatted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78446628
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Siddhi CEP Execution Environment
    + */
    +@PublicEvolving
    +public class SiddhiCEP {
    --- End diff --
    
    I think this class would benefit from more JavaDocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78446790
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiStream.java ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext;
    +import org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory;
    +import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Arrays;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Siddhi CEP API Interface
    + */
    +@PublicEvolving
    +public abstract class SiddhiStream {
    +	private final SiddhiCEP environment;
    +
    +	public SiddhiStream(SiddhiCEP environment) {
    --- End diff --
    
    Could we name this `cepEnvironment`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @uce sure, will fix it and resend PR to Bahir.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by asdf2014 <gi...@git.apache.org>.
Github user asdf2014 commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @dianfu Great! Look forward to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @asdf2014 I think there was never a PR for siddhi at the Bahir project.
    But if you are interested, you could work on contributing it to bahir.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78449235
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/SiddhiOperatorContext.java ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi.operator;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
    +import org.apache.flink.contrib.siddhi.schema.StreamSchema;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.util.Preconditions;
    +import org.wso2.siddhi.core.SiddhiManager;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * SiddhiCEP Operator Execution Context
    + */
    +public class SiddhiOperatorContext implements Serializable {
    --- End diff --
    
    I think all public methods would benefit from JavaDocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @mushketyk I have added lots of java docs as required in latest commit: https://github.com/apache/flink/pull/2487/commits/4699f9c3dfc4ce0a9837eb60579c76d50b346f03, please continue to help review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @mushketyk thanks very much for reviewing, will fix as required soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    @StephanEwen sure, that's all ok. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78446343
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Siddhi CEP Execution Environment
    + */
    +@PublicEvolving
    +public class SiddhiCEP {
    +	private final StreamExecutionEnvironment executionEnvironment;
    +	private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
    +	private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
    +	private final Map<String, Class<?>> extensions = new HashMap<>();
    +
    +	public Map<String, DataStream<?>> getDataStreams() {
    +		return this.dataStreams;
    +	}
    +
    +	public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
    +		return this.dataStreamSchemas;
    +	}
    +
    +	public boolean isStreamDefined(String streamId) {
    +		return dataStreams.containsKey(streamId);
    +	}
    +
    +	public Map<String, Class<?>> getExtensions() {
    +		return this.extensions;
    +	}
    +
    +	public void checkStreamDefined(String streamId) throws UndefinedStreamException {
    +		if (!isStreamDefined(streamId)) {
    +			throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
    +		}
    +	}
    +
    +	public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
    --- End diff --
    
    Could you put constructor after the fields definitions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78446582
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Siddhi CEP Execution Environment
    + */
    +@PublicEvolving
    +public class SiddhiCEP {
    +	private final StreamExecutionEnvironment executionEnvironment;
    +	private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
    +	private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
    +	private final Map<String, Class<?>> extensions = new HashMap<>();
    +
    +	public Map<String, DataStream<?>> getDataStreams() {
    +		return this.dataStreams;
    +	}
    +
    +	public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
    +		return this.dataStreamSchemas;
    +	}
    +
    +	public boolean isStreamDefined(String streamId) {
    +		return dataStreams.containsKey(streamId);
    +	}
    +
    +	public Map<String, Class<?>> getExtensions() {
    +		return this.extensions;
    +	}
    +
    +	public void checkStreamDefined(String streamId) throws UndefinedStreamException {
    +		if (!isStreamDefined(streamId)) {
    +			throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
    +		}
    +	}
    +
    +	public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
    +		this.executionEnvironment = streamExecutionEnvironment;
    --- End diff --
    
    The same comment is for other public methods of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    Cool, than you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

Posted by asdf2014 <gi...@git.apache.org>.
Github user asdf2014 commented on the issue:

    https://github.com/apache/flink/pull/2487
  
    Hi, @haoch @rmetzger . Why i cannot find `flink-siddhi` in [bahir](https://github.com/apache/bahir) or [bahir-flink](https://github.com/apache/bahir-flink)? In addition, the [flink-siddhi](https://github.com/haoch/flink-siddhi) still depence on `flink v1.1.2`. May i ask how long will i could use these advanced features of `Siddhi CEP` on flink... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

Posted by apivovarov <gi...@git.apache.org>.
Github user apivovarov commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78270757
  
    --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Siddhi CEP Execution Environment
    + */
    +@PublicEvolving
    +public class SiddhiCEP {
    +	private final StreamExecutionEnvironment executionEnvironment;
    +	private final Map<String, DataStream<?>> dataStreams;
    +	private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas;
    +	private final Map<String,Class<?>> extensions = new HashMap<>();
    +
    +	public Map<String, DataStream<?>> getDataStreams(){
    +		return this.dataStreams;
    +	}
    +
    +	public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas(){
    +		return this.dataStreamSchemas;
    +	}
    +
    +	public boolean isStreamDefined(String streamId){
    +		return dataStreams.containsKey(streamId);
    +	}
    +
    +	public Map<String,Class<?>> getExtensions(){
    +		return this.extensions;
    +	}
    +
    +	public void checkStreamDefined(String streamId) throws UndefinedStreamException {
    +		if(!isStreamDefined(streamId)){
    +			throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined");
    +		}
    +	}
    +
    +	public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
    +		this.executionEnvironment = streamExecutionEnvironment;
    +		this.dataStreams = new HashMap<>();
    +		this.dataStreamSchemas = new HashMap<>();
    +	}
    +
    +	public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> inStream, String... fieldNames) {
    +		SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
    +		return environment.from(streamId,inStream,fieldNames);
    +	}
    +
    +	public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> inStream, String... fieldNames){
    +		this.registerStream(streamId,inStream,fieldNames);
    +		return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
    +	}
    +
    +	public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId){
    +		return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
    +	}
    +
    +	public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId,String ... unionStreamIds){
    +		return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId,this).union(unionStreamIds);
    +	}
    +
    +	public  <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
    +		if (isStreamDefined(streamId)) {
    +			throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
    +		}
    +		dataStreams.put(streamId, dataStream);
    +		SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
    +		schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
    +		dataStreamSchemas.put(streamId, schema);
    +	}
    +
    +	public StreamExecutionEnvironment getExecutionEnvironment() {
    +		return executionEnvironment;
    +	}
    +
    +	public void registerExtension(String extensionName, Class<?> extensionClass) {
    +		if(extensions.containsKey(extensionName)){
    --- End diff --
    
    not formatted code, put space after if and before {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---