You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/11/26 15:13:28 UTC

[3/3] bahir-flink git commit: [BAHIR-144] Add flink-library-siddhi

[BAHIR-144] Add flink-library-siddhi

This closes #22


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/2f47eedc
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/2f47eedc
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/2f47eedc

Branch: refs/heads/BAHIR-144-merge
Commit: 2f47eedc03250b07bc75880b5700e56386ef64b5
Parents: 4f0179a
Author: Hao Chen <hc...@ebay.com>
Authored: Sat Nov 18 22:15:32 2017 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Sun Nov 26 16:13:11 2017 +0100

----------------------------------------------------------------------
 flink-library-siddhi/pom.xml                    | 113 ++++++
 .../flink/streaming/siddhi/SiddhiCEP.java       | 231 +++++++++++
 .../flink/streaming/siddhi/SiddhiStream.java    | 279 +++++++++++++
 .../exception/DuplicatedStreamException.java    |  24 ++
 .../exception/UndefinedStreamException.java     |  24 ++
 .../siddhi/operator/AbstractSiddhiOperator.java | 338 ++++++++++++++++
 .../siddhi/operator/SiddhiOperatorContext.java  | 227 +++++++++++
 .../siddhi/operator/SiddhiStreamOperator.java   |  80 ++++
 .../operator/StreamInMemOutputHandler.java      | 104 +++++
 .../siddhi/operator/StreamOutputHandler.java    | 101 +++++
 .../siddhi/operator/StreamRecordComparator.java |  41 ++
 .../flink/streaming/siddhi/package-info.java    |  78 ++++
 .../siddhi/schema/SiddhiStreamSchema.java       |  72 ++++
 .../streaming/siddhi/schema/StreamSchema.java   | 173 ++++++++
 .../siddhi/schema/StreamSerializer.java         |  76 ++++
 .../siddhi/utils/SiddhiStreamFactory.java       |  33 ++
 .../siddhi/utils/SiddhiTupleFactory.java        | 128 ++++++
 .../siddhi/utils/SiddhiTypeFactory.java         | 136 +++++++
 .../flink/streaming/siddhi/SiddhiCEPITCase.java | 403 +++++++++++++++++++
 .../extension/CustomPlusFunctionExtension.java  | 107 +++++
 .../siddhi/operator/SiddhiSyntaxTest.java       |  83 ++++
 .../schema/SiddhiExecutionPlanSchemaTest.java   |  49 +++
 .../siddhi/schema/StreamSchemaTest.java         |  94 +++++
 .../siddhi/schema/StreamSerializerTest.java     |  40 ++
 .../flink/streaming/siddhi/source/Event.java    | 110 +++++
 .../siddhi/source/RandomEventSource.java        |  72 ++++
 .../siddhi/source/RandomTupleSource.java        |  74 ++++
 .../siddhi/source/RandomWordSource.java         | 111 +++++
 .../siddhi/utils/SiddhiTupleFactoryTest.java    |  46 +++
 .../siddhi/utils/SiddhiTypeFactoryTest.java     |  50 +++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/logback-test.xml         |  34 ++
 pom.xml                                         |   1 +
 33 files changed, 3559 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
new file mode 100644
index 0000000..91c6797
--- /dev/null
+++ b/flink-library-siddhi/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-library-siddhi_2.11</artifactId>
+    <name>flink-library-siddhi</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <siddhi.version>4.0.0-M120</siddhi.version>
+    </properties>
+
+    <dependencies>
+        <!-- core dependencies -->
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-core</artifactId>
+            <version>${siddhi.version}</version>
+            <exclusions>
+                <exclusion>  <!-- declare the exclusion here -->
+                    <groupId>org.apache.directory.jdbm</groupId>
+                    <artifactId>apacheds-jdbm1</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-query-api</artifactId>
+            <version>${siddhi.version}</version>
+            <exclusions>
+                <exclusion>  <!-- declare the exclusion here -->
+                    <groupId>org.apache.directory.jdbm</groupId>
+                    <artifactId>apacheds-jdbm1</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Core streaming API -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <repositories>
+        <repository>
+            <id>wso2-maven2-repository</id>
+            <name>WSO2 Maven2 Repository</name>
+            <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
+        </repository>
+    </repositories>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
new file mode 100644
index 0000000..a63dbf6
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Siddhi CEP Environment, provides utility methods to
+ *
+ * <ul>
+ *     <li>Initialize SiddhiCEP environment based on {@link StreamExecutionEnvironment}</li>
+ *     <li>Register {@link SiddhiStream} with field-based StreamSchema and bind with physical source {@link DataStream}</li>
+ *     <li>Define rich-featured Siddhi CEP execution plan with SQL-Like query for SiddhiStreamOperator</li>
+ *     <li>Transform and connect source DataStream to SiddhiStreamOperator</li>
+ *     <li>Register customizable siddhi plugins to extend built-in CEP functions</li>
+ * </ul>
+ * </p>
+ *
+ * @see SiddhiStream
+ * @see StreamSchema
+ * @see SiddhiStreamOperator
+ */
+@PublicEvolving
+public class SiddhiCEP {
+    private final StreamExecutionEnvironment executionEnvironment;
+    private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
+    private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
+    private final Map<String, Class<?>> extensions = new HashMap<>();
+
+    /**
+     * @param streamExecutionEnvironment Stream Execution Environment
+     */
+    private SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
+        this.executionEnvironment = streamExecutionEnvironment;
+    }
+
+    /**
+     * @see DataStream
+     * @return Siddhi streamId and source DataStream mapping.
+     */
+    public Map<String, DataStream<?>> getDataStreams() {
+        return this.dataStreams;
+    }
+
+    /**
+     * @see SiddhiStreamSchema
+     * @return Siddhi streamId and stream schema mapping.
+     */
+    public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
+        return this.dataStreamSchemas;
+    }
+
+    /**
+     * @param streamId Siddhi streamId to check.
+     * @return whether the given streamId is defined in current SiddhiCEP environment.
+     */
+    public boolean isStreamDefined(String streamId) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        return dataStreams.containsKey(streamId);
+    }
+
+    /**
+     * @return Registered siddhi extensions.
+     */
+    public Map<String, Class<?>> getExtensions() {
+        return this.extensions;
+    }
+
+    /**
+     * Check whether given streamId has been defined, if not, throw {@link UndefinedStreamException}
+     * @param streamId Siddhi streamId to check.
+     * @throws UndefinedStreamException throws if given streamId is not defined
+     */
+    public void checkStreamDefined(String streamId) throws UndefinedStreamException {
+        Preconditions.checkNotNull(streamId,"streamId");
+        if (!isStreamDefined(streamId)) {
+            throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
+        }
+    }
+
+    /**
+     * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema,
+     * and select as initial source stream to connect to siddhi operator.
+     *
+     * @param streamId Unique siddhi streamId
+     * @param dataStream DataStream to bind to the siddhi stream.
+     * @param fieldNames Siddhi stream schema field names
+     *
+     * @see #registerStream(String, DataStream, String...)
+     * @see #from(String)
+     */
+    public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> dataStream, String... fieldNames) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        Preconditions.checkNotNull(dataStream,"dataStream");
+        Preconditions.checkNotNull(fieldNames,"fieldNames");
+        SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(dataStream.getExecutionEnvironment());
+        return environment.from(streamId, dataStream, fieldNames);
+    }
+
+    /**
+     * Register stream with unique <code>streaId</code>, source <code>dataStream</code> and schema fields,
+     * and select the registered stream as initial stream to connect to Siddhi Runtime.
+     *
+     * @see #registerStream(String, DataStream, String...)
+     * @see #from(String)
+     */
+    public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> dataStream, String... fieldNames) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        Preconditions.checkNotNull(dataStream,"dataStream");
+        Preconditions.checkNotNull(fieldNames,"fieldNames");
+        this.registerStream(streamId, dataStream, fieldNames);
+        return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+    }
+
+    /**
+     * Select stream by streamId  as initial stream to connect to Siddhi Runtime.
+     *
+     * @param streamId Siddhi Stream Name
+     * @param <T> Stream Generic Type
+     */
+    public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+    }
+
+    /**
+     * Select one stream and union other streams by streamId to connect to Siddhi Stream Operator.
+     *
+     * @param firstStreamId First siddhi streamId, which should be predefined in SiddhiCEP context.
+     * @param unionStreamIds Other siddhi streamIds to union, which should be predefined in SiddhiCEP context.
+     *
+     * @return The UnionSiddhiStream Builder
+     */
+    public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId, String... unionStreamIds) {
+        Preconditions.checkNotNull(firstStreamId,"firstStreamId");
+        Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
+        return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId, this).union(unionStreamIds);
+    }
+
+    /**
+     * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema.
+     *
+     * @param streamId Unique siddhi streamId
+     * @param dataStream DataStream to bind to the siddhi stream.
+     * @param fieldNames Siddhi stream schema field names
+     */
+    public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
+        Preconditions.checkNotNull(streamId,"streamId");
+        Preconditions.checkNotNull(dataStream,"dataStream");
+        Preconditions.checkNotNull(fieldNames,"fieldNames");
+        if (isStreamDefined(streamId)) {
+            throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
+        }
+        dataStreams.put(streamId, dataStream);
+        SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+        schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+        dataStreamSchemas.put(streamId, schema);
+    }
+
+    /**
+     * @return Current StreamExecutionEnvironment.
+     */
+    public StreamExecutionEnvironment getExecutionEnvironment() {
+        return executionEnvironment;
+    }
+
+    /**
+     * Register Siddhi CEP Extensions
+     *
+     * @see <a href="https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi">https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi</a>
+     * @param extensionName Unique siddhi extension name
+     * @param extensionClass Siddhi Extension class
+     */
+    public void registerExtension(String extensionName, Class<?> extensionClass) {
+        if (extensions.containsKey(extensionName)) {
+            throw new IllegalArgumentException("Extension named " + extensionName + " already registered");
+        }
+        extensions.put(extensionName, extensionClass);
+    }
+
+    /**
+     * Get registered source DataStream with Siddhi streamId.
+     *
+     * @param streamId Siddhi streamId
+     * @return The source DataStream registered with Siddhi streamId
+     */
+    public <T> DataStream<T> getDataStream(String streamId) {
+        if (this.dataStreams.containsKey(streamId)) {
+            return (DataStream<T>) this.dataStreams.get(streamId);
+        } else {
+            throw new UndefinedStreamException("Undefined stream " + streamId);
+        }
+    }
+
+    /**
+     * Create new SiddhiCEP instance.
+     *
+     * @param streamExecutionEnvironment StreamExecutionEnvironment
+     * @return New SiddhiCEP instance.
+     */
+    public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
+        return new SiddhiCEP(streamExecutionEnvironment);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
new file mode 100644
index 0000000..43d7436
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Stream API
+ */
+@PublicEvolving
+public abstract class SiddhiStream {
+    private final SiddhiCEP cepEnvironment;
+
+    /**
+     * @param cepEnvironment SiddhiCEP cepEnvironment.
+     */
+    public SiddhiStream(SiddhiCEP cepEnvironment) {
+        Preconditions.checkNotNull(cepEnvironment,"SiddhiCEP cepEnvironment is null");
+        this.cepEnvironment = cepEnvironment;
+    }
+
+    /**
+     * @return current SiddhiCEP cepEnvironment.
+     */
+    protected SiddhiCEP getCepEnvironment() {
+        return this.cepEnvironment;
+    }
+
+    /**
+     * @return Transform SiddhiStream to physical DataStream
+     */
+    protected abstract DataStream<Tuple2<String, Object>> toDataStream();
+
+    /**
+     * Convert DataStream&lt;T&gt; to DataStream&lt;Tuple2&lt;String,T&gt;&gt;.
+     * If it's KeyedStream. pass through original keySelector
+     */
+    protected <T> DataStream<Tuple2<String, Object>> convertDataStream(DataStream<T> dataStream, String streamId) {
+        final String streamIdInClosure = streamId;
+        DataStream<Tuple2<String, Object>> resultStream = dataStream.map(new MapFunction<T, Tuple2<String, Object>>() {
+            @Override
+            public Tuple2<String, Object> map(T value) throws Exception {
+                return Tuple2.of(streamIdInClosure, (Object) value);
+            }
+        });
+        if (dataStream instanceof KeyedStream) {
+            final KeySelector<T, Object> keySelector = ((KeyedStream<T, Object>) dataStream).getKeySelector();
+            final KeySelector<Tuple2<String, Object>, Object> keySelectorInClosure = new KeySelector<Tuple2<String, Object>, Object>() {
+                @Override
+                public Object getKey(Tuple2<String, Object> value) throws Exception {
+                    return keySelector.getKey((T) value.f1);
+                }
+            };
+            return resultStream.keyBy(keySelectorInClosure);
+        } else {
+            return resultStream;
+        }
+    }
+
+    /**
+     * ExecutableStream context to define execution logic, i.e. SiddhiCEP execution plan.
+     */
+    public abstract static class ExecutableStream extends SiddhiStream {
+        public ExecutableStream(SiddhiCEP environment) {
+            super(environment);
+        }
+
+        /**
+         * Siddhi Continuous Query Language (CQL)
+         *
+         * @param executionPlan Siddhi SQL-Like execution plan query
+         * @return ExecutionSiddhiStream context
+         */
+        public ExecutionSiddhiStream cql(String executionPlan) {
+            Preconditions.checkNotNull(executionPlan,"executionPlan");
+            return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getCepEnvironment());
+        }
+    }
+
+    /**
+     * Initial Single Siddhi Stream Context
+     */
+    public static class SingleSiddhiStream<T> extends ExecutableStream {
+        private final String streamId;
+
+        public SingleSiddhiStream(String streamId, SiddhiCEP environment) {
+            super(environment);
+            environment.checkStreamDefined(streamId);
+            this.streamId = streamId;
+        }
+
+
+        /**
+         * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and as the first stream of {@link UnionSiddhiStream}
+         *
+         * @param streamId Unique siddhi streamId
+         * @param dataStream DataStream to bind to the siddhi stream.
+         * @param fieldNames Siddhi stream schema field names
+         *
+         * @return {@link UnionSiddhiStream} context
+         */
+        public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
+            getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
+            return union(streamId);
+        }
+
+        /**
+         * @param streamIds Defined siddhi streamIds to union
+         * @return {@link UnionSiddhiStream} context
+         */
+        public UnionSiddhiStream<T> union(String... streamIds) {
+            Preconditions.checkNotNull(streamIds,"streamIds");
+            return new UnionSiddhiStream<T>(this.streamId, Arrays.asList(streamIds), this.getCepEnvironment());
+        }
+
+        @Override
+        protected DataStream<Tuple2<String, Object>> toDataStream() {
+            return convertDataStream(getCepEnvironment().getDataStream(this.streamId), this.streamId);
+        }
+    }
+
+    public static class UnionSiddhiStream<T> extends ExecutableStream {
+        private String firstStreamId;
+        private List<String> unionStreamIds;
+
+        public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, SiddhiCEP environment) {
+            super(environment);
+            Preconditions.checkNotNull(firstStreamId,"firstStreamId");
+            Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
+            environment.checkStreamDefined(firstStreamId);
+            for (String unionStreamId : unionStreamIds) {
+                environment.checkStreamDefined(unionStreamId);
+            }
+            this.firstStreamId = firstStreamId;
+            this.unionStreamIds = unionStreamIds;
+        }
+
+        /**
+         * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and continue to union it with current stream.
+         *
+         * @param streamId Unique siddhi streamId
+         * @param dataStream DataStream to bind to the siddhi stream.
+         * @param fieldNames Siddhi stream schema field names
+         *
+         * @return {@link UnionSiddhiStream} context
+         */
+        public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
+            Preconditions.checkNotNull(streamId,"streamId");
+            Preconditions.checkNotNull(dataStream,"dataStream");
+            Preconditions.checkNotNull(fieldNames,"fieldNames");
+            getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
+            return union(streamId);
+        }
+
+        /**
+         * @param streamId another defined streamId to union with.
+         * @return {@link UnionSiddhiStream} context
+         */
+        public UnionSiddhiStream<T> union(String... streamId) {
+            List<String> newUnionStreamIds = new LinkedList<>();
+            newUnionStreamIds.addAll(unionStreamIds);
+            newUnionStreamIds.addAll(Arrays.asList(streamId));
+            return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getCepEnvironment());
+        }
+
+        @Override
+        protected DataStream<Tuple2<String, Object>> toDataStream() {
+            final String localFirstStreamId = firstStreamId;
+            final List<String> localUnionStreamIds = this.unionStreamIds;
+            DataStream<Tuple2<String, Object>> dataStream = convertDataStream(getCepEnvironment().<T>getDataStream(localFirstStreamId), this.firstStreamId);
+            for (String unionStreamId : localUnionStreamIds) {
+                dataStream = dataStream.union(convertDataStream(getCepEnvironment().<T>getDataStream(unionStreamId), unionStreamId));
+            }
+            return dataStream;
+        }
+    }
+
+    public static class ExecutionSiddhiStream {
+        private final DataStream<Tuple2<String, Object>> dataStream;
+        private final SiddhiCEP environment;
+        private final String executionPlan;
+
+        public ExecutionSiddhiStream(DataStream<Tuple2<String, Object>> dataStream, String executionPlan, SiddhiCEP environment) {
+            this.executionPlan = executionPlan;
+            this.dataStream = dataStream;
+            this.environment = environment;
+        }
+
+        /**
+         * @param outStreamId The <code>streamId</code> to return as data stream.
+         * @param <T>         Type information should match with stream definition.
+         *                    During execution phase, it will automatically build type information based on stream definition.
+         * @return Return output stream as Tuple
+         * @see SiddhiTypeFactory
+         */
+        public <T extends Tuple> DataStream<T> returns(String outStreamId) {
+            SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
+            siddhiContext.setExecutionPlan(executionPlan);
+            siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
+            siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
+            siddhiContext.setOutputStreamId(outStreamId);
+            siddhiContext.setExtensions(environment.getExtensions());
+            siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
+            TypeInformation<T> typeInformation =
+                SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId);
+            siddhiContext.setOutputStreamType(typeInformation);
+            return returnsInternal(siddhiContext);
+        }
+
+        /**
+         * @return Return output stream as <code>DataStream&lt;Map&lt;String,Object&gt;&gt;</code>,
+         * out type is <code>LinkedHashMap&lt;String,Object&gt;</code> and guarantee field order
+         * as defined in siddhi execution plan
+         * @see java.util.LinkedHashMap
+         */
+        public DataStream<Map<String, Object>> returnAsMap(String outStreamId) {
+            return this.returnsInternal(outStreamId, SiddhiTypeFactory.getMapTypeInformation());
+        }
+
+        /**
+         * @param outStreamId OutStreamId
+         * @param outType     Output type class
+         * @param <T>         Output type
+         * @return Return output stream as POJO class.
+         */
+        public <T> DataStream<T> returns(String outStreamId, Class<T> outType) {
+            TypeInformation<T> typeInformation = TypeExtractor.getForClass(outType);
+            return returnsInternal(outStreamId, typeInformation);
+        }
+
+        private <T> DataStream<T> returnsInternal(String outStreamId, TypeInformation<T> typeInformation) {
+            SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
+            siddhiContext.setExecutionPlan(executionPlan);
+            siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
+            siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
+            siddhiContext.setOutputStreamId(outStreamId);
+            siddhiContext.setOutputStreamType(typeInformation);
+            siddhiContext.setExtensions(environment.getExtensions());
+            siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
+            return returnsInternal(siddhiContext);
+        }
+
+        private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext) {
+            return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
new file mode 100644
index 0000000..f65cc81
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.exception;
+
+public class DuplicatedStreamException extends RuntimeException {
+    public DuplicatedStreamException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
new file mode 100644
index 0000000..26254c2
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.exception;
+
+public class UndefinedStreamException extends RuntimeException {
+    public UndefinedStreamException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
new file mode 100755
index 0000000..8cb6d67
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+/**
+ * <h1>Siddhi Runtime Operator</h1>
+ *
+ * A flink Stream Operator to integrate with native siddhi execution runtime, extension and type schema mechanism/
+ *
+ * <ul>
+ * <li>
+ * Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle.
+ * </li>
+ * <li>
+ * Connect Flink DataStreams with predefined Siddhi Stream according to unique streamId
+ * </li>
+ * <li>
+ * Convert native {@link StreamRecord} to Siddhi {@link org.wso2.siddhi.core.event.Event} according to {@link StreamSchema}, and send to Siddhi Runtime.
+ * </li>
+ * <li>
+ * Listen output callback event and convert as expected output type according to output {@link org.apache.flink.api.common.typeinfo.TypeInformation}, then output as typed DataStream.
+ * </li>
+ * </li>
+ * <li>
+ * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
+ * </li>
+ * <li>
+ * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
+ * </li>
+ * </ul>
+ *
+ * @param <IN>  Input Element Type
+ * @param <OUT> Output Element Type
+ */
+public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT>
+    implements OneInputStreamOperator<IN, OUT> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
+    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+    private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
+    private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
+
+    private final SiddhiOperatorContext siddhiPlan;
+    private final String executionExpression;
+    private final boolean isProcessingTime;
+    private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers;
+
+    private transient SiddhiManager siddhiManager;
+    private transient SiddhiAppRuntime siddhiRuntime;
+    private transient Map<String, InputHandler> inputStreamHandlers;
+
+    // queue to buffer out of order stream records
+    private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
+
+    private transient ListState<byte[]> siddhiRuntimeState;
+    private transient ListState<byte[]> queuedRecordsState;
+
+    /**
+     * @param siddhiPlan Siddhi CEP  Execution Plan
+     */
+    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+        validate(siddhiPlan);
+        this.executionExpression = siddhiPlan.getFinalExecutionPlan();
+        this.siddhiPlan = siddhiPlan;
+        this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+        this.streamRecordSerializers = new HashMap<>();
+
+        registerStreamRecordSerializers();
+    }
+
+    /**
+     * Register StreamRecordSerializer based on {@link StreamSchema}
+     */
+    private void registerStreamRecordSerializers() {
+        for (String streamId : this.siddhiPlan.getInputStreams()) {
+            streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig()));
+        }
+    }
+
+    protected abstract StreamElementSerializer<IN> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig);
+
+    protected StreamElementSerializer<IN> getStreamRecordSerializer(String streamId) {
+        if (streamRecordSerializers.containsKey(streamId)) {
+            return streamRecordSerializers.get(streamId);
+        } else {
+            throw new UndefinedStreamException("Stream " + streamId + " not defined");
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<IN> element) throws Exception {
+        String streamId = getStreamId(element.getValue());
+        StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);
+
+        if (isProcessingTime) {
+            processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
+            this.checkpointSiddhiRuntimeState();
+        } else {
+            PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+            // event time processing
+            // we have to buffer the elements until we receive the proper watermark
+            if (getExecutionConfig().isObjectReuseEnabled()) {
+                // copy the StreamRecord so that it cannot be changed
+                priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
+            } else {
+                priorityQueue.offer(element);
+            }
+            this.checkpointRecordQueueState();
+        }
+    }
+
+    protected abstract void processEvent(String streamId, StreamSchema<IN> schema, IN value, long timestamp) throws Exception;
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+            StreamRecord<IN> streamRecord = priorityQueue.poll();
+            String streamId = getStreamId(streamRecord.getValue());
+            long timestamp = streamRecord.getTimestamp();
+            StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);
+            processEvent(streamId, schema, streamRecord.getValue(), timestamp);
+        }
+        output.emitWatermark(mark);
+    }
+
+    public abstract String getStreamId(IN record);
+
+    public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
+        return priorityQueue;
+    }
+
+    protected SiddhiAppRuntime getSiddhiRuntime() {
+        return this.siddhiRuntime;
+    }
+
+    public InputHandler getSiddhiInputHandler(String streamId) {
+        return inputStreamHandlers.get(streamId);
+    }
+
+    protected SiddhiOperatorContext getSiddhiPlan() {
+        return this.siddhiPlan;
+    }
+
+    @Override
+    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+        super.setup(containingTask, config, output);
+        if (priorityQueue == null) {
+            priorityQueue = new PriorityQueue<>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
+        }
+        startSiddhiRuntime();
+    }
+
+    /**
+     * Send input data to siddhi runtime
+     */
+    protected void send(String streamId, Object[] data, long timestamp) throws InterruptedException {
+        this.getSiddhiInputHandler(streamId).send(timestamp, data);
+    }
+
+    /**
+     * Validate execution plan during building DAG before submitting to execution environment and fail-fast.
+     */
+    private static void validate(final SiddhiOperatorContext siddhiPlan) {
+        SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager();
+        try {
+            siddhiManager.validateSiddhiApp(siddhiPlan.getFinalExecutionPlan());
+        } finally {
+            siddhiManager.shutdown();
+        }
+    }
+
+    /**
+     * Create and start execution runtime
+     */
+    private void startSiddhiRuntime() {
+        if (this.siddhiRuntime == null) {
+            this.siddhiManager = this.siddhiPlan.createSiddhiManager();
+            for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
+                this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
+            }
+            this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression);
+            this.siddhiRuntime.start();
+            registerInputAndOutput(this.siddhiRuntime);
+            LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
+        } else {
+            throw new IllegalStateException("Siddhi has already been initialized");
+        }
+    }
+
+
+    private void shutdownSiddhiRuntime() {
+        if (this.siddhiRuntime != null) {
+            this.siddhiRuntime.shutdown();
+            LOGGER.info("Siddhi {} shutdown", this.siddhiRuntime.getName());
+            this.siddhiRuntime = null;
+            this.siddhiManager.shutdown();
+            this.siddhiManager = null;
+            this.inputStreamHandlers = null;
+        } else {
+            throw new IllegalStateException("Siddhi has already shutdown");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void registerInputAndOutput(SiddhiAppRuntime runtime) {
+        AbstractDefinition definition = this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId());
+        runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler<>(this.siddhiPlan.getOutputStreamType(), definition, this.output));
+        inputStreamHandlers = new HashMap<>();
+        for (String inputStreamId : this.siddhiPlan.getInputStreams()) {
+            inputStreamHandlers.put(inputStreamId, runtime.getInputHandler(inputStreamId));
+        }
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        shutdownSiddhiRuntime();
+        this.siddhiRuntimeState.clear();
+        super.dispose();
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        checkpointSiddhiRuntimeState();
+        checkpointRecordQueueState();
+    }
+
+    private void restoreState() throws Exception {
+        LOGGER.info("Restore siddhi state");
+        final Iterator<byte[]> siddhiState = siddhiRuntimeState.get().iterator();
+        if (siddhiState.hasNext()) {
+            this.siddhiRuntime.restore(siddhiState.next());
+        }
+
+        LOGGER.info("Restore queued records state");
+        final Iterator<byte[]> queueState = queuedRecordsState.get().iterator();
+        if (queueState.hasNext()) {
+            final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(queueState.next());
+            final DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(byteArrayInputStream);
+            try {
+                this.priorityQueue = restoreQueuerState(dataInputView);
+            } finally {
+                dataInputView.close();
+                byteArrayInputStream.close();
+            }
+        }
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        if (siddhiRuntimeState == null) {
+            siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME,
+                    new BytePrimitiveArraySerializer()));
+        }
+        if (queuedRecordsState == null) {
+            queuedRecordsState = context.getOperatorStateStore().getListState(
+                new ListStateDescriptor<>(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer()));
+        }
+        if (context.isRestored()) {
+            restoreState();
+        }
+    }
+
+
+    private void checkpointSiddhiRuntimeState() throws Exception {
+        this.siddhiRuntimeState.clear();
+        this.siddhiRuntimeState.add(this.siddhiRuntime.snapshot());
+        this.queuedRecordsState.clear();
+    }
+
+    private void checkpointRecordQueueState() throws Exception {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        final DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper(byteArrayOutputStream);
+        try {
+            snapshotQueueState(this.priorityQueue, dataOutputView);
+            this.queuedRecordsState.clear();
+            this.queuedRecordsState.add(byteArrayOutputStream.toByteArray());
+        } finally {
+            dataOutputView.close();
+            byteArrayOutputStream.close();
+        }
+    }
+
+    protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> queue, DataOutputView dataOutputView) throws IOException;
+
+    protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView dataInputView) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
new file mode 100644
index 0000000..f760938
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.core.SiddhiManager;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SiddhiCEP Operator Context Metadata including input/output stream (streamId, TypeInformation) as well execution plan query,
+ * and execution environment context like TimeCharacteristic and ExecutionConfig.
+ */
+public class SiddhiOperatorContext implements Serializable {
+    private ExecutionConfig executionConfig;
+    private Map<String, SiddhiStreamSchema<?>> inputStreamSchemas;
+    private final Map<String, Class<?>> siddhiExtensions;
+    private String outputStreamId;
+    private TypeInformation outputStreamType;
+    private TimeCharacteristic timeCharacteristic;
+    private String name;
+    private String executionPlan;
+
+    public SiddhiOperatorContext() {
+        inputStreamSchemas = new HashMap<>();
+        siddhiExtensions = new HashMap<>();
+    }
+
+    /**
+     * @param extensions siddhi extensions to register
+     */
+    public void setExtensions(Map<String, Class<?>> extensions) {
+        Preconditions.checkNotNull(extensions,"extensions");
+        siddhiExtensions.putAll(extensions);
+    }
+
+    /**
+     * @return registered siddhi extensions
+     */
+    public Map<String, Class<?>> getExtensions() {
+        return siddhiExtensions;
+    }
+
+    /**
+     * @return Siddhi Stream Operator Name in format of "Siddhi: execution query ... (query length)"
+     */
+    public String getName() {
+        if (this.name == null) {
+            if (executionPlan.length() > 100) {
+                return String.format("Siddhi: %s ... (%s)", executionPlan.substring(0, 100), executionPlan.length() - 100);
+            } else {
+                return String.format("Siddhi: %s", executionPlan);
+            }
+        } else {
+            return this.name;
+        }
+    }
+
+    /**
+     * @return Source siddhi stream IDs
+     */
+    public List<String> getInputStreams() {
+        Object[] keys = this.inputStreamSchemas.keySet().toArray();
+        List<String> result = new ArrayList<>(keys.length);
+        for (Object key : keys) {
+            result.add((String) key);
+        }
+        return result;
+    }
+
+    /**
+     * @return Siddhi CEP cql-like execution plan
+     */
+    public String getExecutionPlan() {
+        return executionPlan;
+    }
+
+    /**
+     * Stream definition + execution expression
+     */
+    public String getFinalExecutionPlan() {
+        Preconditions.checkNotNull(executionPlan, "Execution plan is not set");
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<String, SiddhiStreamSchema<?>> entry : inputStreamSchemas.entrySet()) {
+            sb.append(entry.getValue().getStreamDefinitionExpression(entry.getKey()));
+        }
+        sb.append(this.getExecutionPlan());
+        return sb.toString();
+    }
+
+    /**
+     * @return Siddhi Stream Operator output type information
+     */
+    public TypeInformation getOutputStreamType() {
+        return outputStreamType;
+    }
+
+    /**
+     * @return Siddhi output streamId for callback
+     */
+    public String getOutputStreamId() {
+        return outputStreamId;
+    }
+
+    /**
+     * @param inputStreamId Siddhi streamId
+     * @return StreamSchema for given siddhi streamId
+     *
+     * @throws UndefinedStreamException throws if stream is not defined
+     */
+    @SuppressWarnings("unchecked")
+    public <IN> StreamSchema<IN> getInputStreamSchema(String inputStreamId) {
+        Preconditions.checkNotNull(inputStreamId,"inputStreamId");
+
+        if (!inputStreamSchemas.containsKey(inputStreamId)) {
+            throw new UndefinedStreamException("Input stream: " + inputStreamId + " is not found");
+        }
+        return (StreamSchema<IN>) inputStreamSchemas.get(inputStreamId);
+    }
+
+    /**
+     * @param outputStreamId Siddhi output streamId, which must exist in siddhi execution plan
+     */
+    public void setOutputStreamId(String outputStreamId) {
+        Preconditions.checkNotNull(outputStreamId,"outputStreamId");
+        this.outputStreamId = outputStreamId;
+    }
+
+    /**
+     * @param outputStreamType Output stream TypeInformation
+     */
+    public void setOutputStreamType(TypeInformation outputStreamType) {
+        Preconditions.checkNotNull(outputStreamType,"outputStreamType");
+        this.outputStreamType = outputStreamType;
+    }
+
+    /**
+     * @return Returns execution environment TimeCharacteristic
+     */
+    public TimeCharacteristic getTimeCharacteristic() {
+        return timeCharacteristic;
+    }
+
+    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+        Preconditions.checkNotNull(timeCharacteristic,"timeCharacteristic");
+        this.timeCharacteristic = timeCharacteristic;
+    }
+
+    /**
+     * @param executionPlan Siddhi SQL-Like exeuction plan query
+     */
+    public void setExecutionPlan(String executionPlan) {
+        Preconditions.checkNotNull(executionPlan,"executionPlan");
+        this.executionPlan = executionPlan;
+    }
+
+    /**
+     * @return Returns input stream ID and  schema mapping
+     */
+    public Map<String, SiddhiStreamSchema<?>> getInputStreamSchemas() {
+        return inputStreamSchemas;
+    }
+
+    /**
+     * @param inputStreamSchemas input stream ID and  schema mapping
+     */
+    public void setInputStreamSchemas(Map<String, SiddhiStreamSchema<?>> inputStreamSchemas) {
+        Preconditions.checkNotNull(inputStreamSchemas,"inputStreamSchemas");
+        this.inputStreamSchemas = inputStreamSchemas;
+    }
+
+    public void setName(String name) {
+        Preconditions.checkNotNull(name,"name");
+        this.name = name;
+    }
+
+    /**
+     * @return Created new SiddhiManager instance with registered siddhi extensions
+     */
+    public SiddhiManager createSiddhiManager() {
+        SiddhiManager siddhiManager = new SiddhiManager();
+        for (Map.Entry<String, Class<?>> entry : getExtensions().entrySet()) {
+            siddhiManager.setExtension(entry.getKey(), entry.getValue());
+        }
+        return siddhiManager;
+    }
+
+    /**
+     * @return StreamExecutionEnvironment ExecutionConfig
+     */
+    public ExecutionConfig getExecutionConfig() {
+        return executionConfig;
+    }
+
+    /**
+     * @param executionConfig StreamExecutionEnvironment ExecutionConfig
+     */
+    public void setExecutionConfig(ExecutionConfig executionConfig) {
+        Preconditions.checkNotNull(executionConfig,"executionConfig");
+        this.executionConfig = executionConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
new file mode 100755
index 0000000..5c54ad8
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Wrap input event in generic type of <code>IN</code> as Tuple2<String,IN>
+ */
+public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
+
+    public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan) {
+        super(siddhiPlan);
+    }
+
+    @Override
+    protected StreamElementSerializer<Tuple2<String, IN>> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) {
+        TypeInformation<Tuple2<String, IN>> tuple2TypeInformation = SiddhiTypeFactory.getStreamTupleTypeInformation((TypeInformation<IN>) streamSchema.getTypeInfo());
+        return new StreamElementSerializer<>(tuple2TypeInformation.createSerializer(executionConfig));
+    }
+
+    @Override
+    protected void processEvent(String streamId, StreamSchema<Tuple2<String, IN>> schema, Tuple2<String, IN> value, long timestamp) throws InterruptedException {
+        send(value.f0, getSiddhiPlan().getInputStreamSchema(value.f0).getStreamSerializer().getRow(value.f1), timestamp);
+    }
+
+    @Override
+    public String getStreamId(Tuple2<String, IN> record) {
+        return record.f0;
+    }
+
+    @Override
+    protected void snapshotQueueState(PriorityQueue<StreamRecord<Tuple2<String, IN>>> queue, DataOutputView dataOutputView) throws IOException {
+        dataOutputView.writeInt(queue.size());
+        for (StreamRecord<Tuple2<String, IN>> record : queue) {
+            String streamId = record.getValue().f0;
+            dataOutputView.writeUTF(streamId);
+            this.getStreamRecordSerializer(streamId).serialize(record, dataOutputView);
+        }
+    }
+
+    @Override
+    protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
+        int sizeOfQueue = dataInputView.readInt();
+        PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(sizeOfQueue);
+        for (int i = 0; i < sizeOfQueue; i++) {
+            String streamId = dataInputView.readUTF();
+            StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView);
+            priorityQueue.offer(streamElement.<Tuple2<String, IN>>asRecord());
+        }
+        return priorityQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
new file mode 100755
index 0000000..7af37ce
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
+ * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition}
+ */
+public class StreamInMemOutputHandler<R> extends StreamCallback {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamInMemOutputHandler.class);
+
+    private final AbstractDefinition definition;
+    private final TypeInformation<R> typeInfo;
+    private final ObjectMapper objectMapper;
+
+
+    private final LinkedList<StreamRecord<R>> collectedRecords;
+
+    public StreamInMemOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition) {
+        this.typeInfo = typeInfo;
+        this.definition = definition;
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.collectedRecords = new LinkedList<>();
+    }
+
+    @Override
+    public void receive(Event[] events) {
+        for (Event event : events) {
+            if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
+                collectedRecords.add(new StreamRecord<R>((R) toMap(event), event.getTimestamp()));
+            } else if (typeInfo.isTupleType()) {
+                Tuple tuple = this.toTuple(event);
+                collectedRecords.add(new StreamRecord<R>((R) tuple, event.getTimestamp()));
+            } else if (typeInfo instanceof PojoTypeInfo) {
+                R obj;
+                try {
+                    obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
+                } catch (IllegalArgumentException ex) {
+                    LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
+                    throw ex;
+                }
+                collectedRecords.add(new StreamRecord<R>(obj, event.getTimestamp()));
+            } else {
+                throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
+            }
+        }
+    }
+
+
+    @Override
+    public synchronized void stopProcessing() {
+        super.stopProcessing();
+        this.collectedRecords.clear();
+    }
+
+    private Map<String, Object> toMap(Event event) {
+        Map<String, Object> map = new LinkedHashMap<>();
+        for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
+            map.put(definition.getAttributeNameArray()[i], event.getData(i));
+        }
+        return map;
+    }
+
+    private <T extends Tuple> T toTuple(Event event) {
+        return SiddhiTupleFactory.newTuple(event.getData());
+    }
+
+    public LinkedList<StreamRecord<R>> getCollectedRecords() {
+        return collectedRecords;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
new file mode 100755
index 0000000..8840dac
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
+ * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition}
+ */
+public class StreamOutputHandler<R> extends StreamCallback {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamOutputHandler.class);
+
+    private final AbstractDefinition definition;
+    private final Output<StreamRecord<R>> output;
+    private final TypeInformation<R> typeInfo;
+    private final ObjectMapper objectMapper;
+
+    public StreamOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition, Output<StreamRecord<R>> output) {
+        this.typeInfo = typeInfo;
+        this.definition = definition;
+        this.output = output;
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public void receive(Event[] events) {
+        StreamRecord<R> reusableRecord = new StreamRecord<>(null, 0L);
+        for (Event event : events) {
+            if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
+                reusableRecord.replace(toMap(event), event.getTimestamp());
+                output.collect(reusableRecord);
+            } else if (typeInfo.isTupleType()) {
+                Tuple tuple = this.toTuple(event);
+                reusableRecord.replace(tuple, event.getTimestamp());
+                output.collect(reusableRecord);
+            } else if (typeInfo instanceof PojoTypeInfo) {
+                R obj;
+                try {
+                    obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
+                } catch (IllegalArgumentException ex) {
+                    LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
+                    throw ex;
+                }
+                reusableRecord.replace(obj, event.getTimestamp());
+                output.collect(reusableRecord);
+            } else {
+                throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
+            }
+        }
+    }
+
+
+    @Override
+    public synchronized void stopProcessing() {
+        super.stopProcessing();
+    }
+
+    private Map<String, Object> toMap(Event event) {
+        Map<String, Object> map = new LinkedHashMap<>();
+        for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
+            map.put(definition.getAttributeNameArray()[i], event.getData(i));
+        }
+        return map;
+    }
+
+    private <T extends Tuple> T toTuple(Event event) {
+        return SiddhiTupleFactory.newTuple(event.getData());
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
new file mode 100644
index 0000000..049681c
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Stream Record Timestamp Comparator
+ */
+public class StreamRecordComparator<IN> implements Comparator<StreamRecord<IN>>, Serializable {
+    private static final long serialVersionUID = 1581054988433915305L;
+
+    @Override
+    public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+        if (o1.getTimestamp() < o2.getTimestamp()) {
+            return -1;
+        } else if (o1.getTimestamp() > o2.getTimestamp()) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
new file mode 100644
index 0000000..6b1ceae
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <h1> Features </h1>
+ * <ul>
+ * <li>
+ * Integrate Siddhi CEP as an  stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
+ * <ul>
+ * <li>Filter</li>
+ * <li>Join</li>
+ * <li>Aggregation</li>
+ * <li>Group by</li>
+ * <li>Having</li>
+ * <li>Window</li>
+ * <li>Conditions and Expressions</li>
+ * <li>Pattern processing</li>
+ * <li>Sequence processing</li>
+ * <li>Event Tables</li>
+ * <li>...</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
+ * <ul>
+ * <li>Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.</li>
+ * <li>Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan</li>
+ * <li>Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
+ * </li>
+ * <li>
+ * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
+ * </li>
+ * </ul>
+ * <p/>
+ * <h1>Example</h1>
+ * <pre>
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+ *
+ * cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
+ *
+ * cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
+ * cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
+ *
+ * DataStream&lt;Tuple4&lt;Integer,String,Integer,String&gt;&gt; output = cep
+ *     .from("inputStream1").union("inputStream2")
+ *     .cql(
+ *         "from every s1 = inputStream1[id == 2] "
+ *          + " -> s2 = inputStream2[id == 3] "
+ *          + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+ *          + "insert into outputStream"
+ *     )
+ *     .returns("outputStream");
+ *
+ * env.execute();
+ * </pre>
+ *
+ * @see <a href="https://github.com/wso2/siddhi">https://github.com/wso2/siddhi</a>
+ */
+package org.apache.flink.streaming.siddhi;

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
new file mode 100644
index 0000000..2a3a04c
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Siddhi specific Stream Schema.
+ *
+ * @param <T> Siddhi stream element type
+ */
+public class SiddhiStreamSchema<T> extends StreamSchema<T> {
+    private static final String DEFINE_STREAM_TEMPLATE = "define stream %s (%s);";
+
+    public SiddhiStreamSchema(TypeInformation<T> typeInfo, String... fieldNames) {
+        super(typeInfo, fieldNames);
+    }
+
+    public SiddhiStreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+        super(typeInfo, fieldIndexes, fieldNames);
+    }
+
+    public StreamDefinition getStreamDefinition(String streamId) {
+        StreamDefinition streamDefinition = StreamDefinition.id(streamId);
+        for (int i = 0; i < getFieldNames().length; i++) {
+            streamDefinition.attribute(getFieldNames()[i], SiddhiTypeFactory.getAttributeType(getFieldTypes()[i]));
+        }
+        return streamDefinition;
+    }
+
+    public String getStreamDefinitionExpression(StreamDefinition streamDefinition) {
+        List<String> columns = new ArrayList<>();
+        Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+        for (Attribute attribute : streamDefinition.getAttributeList()) {
+            columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
+        }
+        return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
+    }
+
+    public String getStreamDefinitionExpression(String streamId) {
+        StreamDefinition streamDefinition = getStreamDefinition(streamId);
+        List<String> columns = new ArrayList<>();
+        Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+        for (Attribute attribute : streamDefinition.getAttributeList()) {
+            columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
+        }
+        return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
+    }
+}