You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/11/26 15:13:28 UTC
[3/3] bahir-flink git commit: [BAHIR-144] Add flink-library-siddhi
[BAHIR-144] Add flink-library-siddhi
This closes #22
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/2f47eedc
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/2f47eedc
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/2f47eedc
Branch: refs/heads/BAHIR-144-merge
Commit: 2f47eedc03250b07bc75880b5700e56386ef64b5
Parents: 4f0179a
Author: Hao Chen <hc...@ebay.com>
Authored: Sat Nov 18 22:15:32 2017 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Sun Nov 26 16:13:11 2017 +0100
----------------------------------------------------------------------
flink-library-siddhi/pom.xml | 113 ++++++
.../flink/streaming/siddhi/SiddhiCEP.java | 231 +++++++++++
.../flink/streaming/siddhi/SiddhiStream.java | 279 +++++++++++++
.../exception/DuplicatedStreamException.java | 24 ++
.../exception/UndefinedStreamException.java | 24 ++
.../siddhi/operator/AbstractSiddhiOperator.java | 338 ++++++++++++++++
.../siddhi/operator/SiddhiOperatorContext.java | 227 +++++++++++
.../siddhi/operator/SiddhiStreamOperator.java | 80 ++++
.../operator/StreamInMemOutputHandler.java | 104 +++++
.../siddhi/operator/StreamOutputHandler.java | 101 +++++
.../siddhi/operator/StreamRecordComparator.java | 41 ++
.../flink/streaming/siddhi/package-info.java | 78 ++++
.../siddhi/schema/SiddhiStreamSchema.java | 72 ++++
.../streaming/siddhi/schema/StreamSchema.java | 173 ++++++++
.../siddhi/schema/StreamSerializer.java | 76 ++++
.../siddhi/utils/SiddhiStreamFactory.java | 33 ++
.../siddhi/utils/SiddhiTupleFactory.java | 128 ++++++
.../siddhi/utils/SiddhiTypeFactory.java | 136 +++++++
.../flink/streaming/siddhi/SiddhiCEPITCase.java | 403 +++++++++++++++++++
.../extension/CustomPlusFunctionExtension.java | 107 +++++
.../siddhi/operator/SiddhiSyntaxTest.java | 83 ++++
.../schema/SiddhiExecutionPlanSchemaTest.java | 49 +++
.../siddhi/schema/StreamSchemaTest.java | 94 +++++
.../siddhi/schema/StreamSerializerTest.java | 40 ++
.../flink/streaming/siddhi/source/Event.java | 110 +++++
.../siddhi/source/RandomEventSource.java | 72 ++++
.../siddhi/source/RandomTupleSource.java | 74 ++++
.../siddhi/source/RandomWordSource.java | 111 +++++
.../siddhi/utils/SiddhiTupleFactoryTest.java | 46 +++
.../siddhi/utils/SiddhiTypeFactoryTest.java | 50 +++
.../src/test/resources/log4j-test.properties | 27 ++
.../src/test/resources/logback-test.xml | 34 ++
pom.xml | 1 +
33 files changed, 3559 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
new file mode 100644
index 0000000..91c6797
--- /dev/null
+++ b/flink-library-siddhi/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-flink-parent_2.11</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-library-siddhi_2.11</artifactId>
+ <name>flink-library-siddhi</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <siddhi.version>4.0.0-M120</siddhi.version>
+ </properties>
+
+ <dependencies>
+ <!-- core dependencies -->
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-core</artifactId>
+ <version>${siddhi.version}</version>
+ <exclusions>
+ <exclusion> <!-- declare the exclusion here -->
+ <groupId>org.apache.directory.jdbm</groupId>
+ <artifactId>apacheds-jdbm1</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-query-api</artifactId>
+ <version>${siddhi.version}</version>
+ <exclusions>
+ <exclusion> <!-- declare the exclusion here -->
+ <groupId>org.apache.directory.jdbm</groupId>
+ <artifactId>apacheds-jdbm1</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Core streaming API -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <repositories>
+ <repository>
+ <id>wso2-maven2-repository</id>
+ <name>WSO2 Maven2 Repository</name>
+ <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
+ </repository>
+ </repositories>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
new file mode 100644
index 0000000..a63dbf6
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Siddhi CEP Environment, provides utility methods to
+ *
+ * <ul>
+ * <li>Initialize SiddhiCEP environment based on {@link StreamExecutionEnvironment}</li>
+ * <li>Register {@link SiddhiStream} with field-based StreamSchema and bind with physical source {@link DataStream}</li>
+ * <li>Define rich-featured Siddhi CEP execution plan with SQL-Like query for SiddhiStreamOperator</li>
+ * <li>Transform and connect source DataStream to SiddhiStreamOperator</li>
+ * <li>Register customizable siddhi plugins to extend built-in CEP functions</li>
+ * </ul>
+ * </p>
+ *
+ * @see SiddhiStream
+ * @see StreamSchema
+ * @see SiddhiStreamOperator
+ */
+@PublicEvolving
+public class SiddhiCEP {
+ private final StreamExecutionEnvironment executionEnvironment;
+ private final Map<String, DataStream<?>> dataStreams = new HashMap<>();
+ private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
+ private final Map<String, Class<?>> extensions = new HashMap<>();
+
+ /**
+ * @param streamExecutionEnvironment Stream Execution Environment
+ */
+ private SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
+ this.executionEnvironment = streamExecutionEnvironment;
+ }
+
+ /**
+ * @see DataStream
+ * @return Siddhi streamId and source DataStream mapping.
+ */
+ public Map<String, DataStream<?>> getDataStreams() {
+ return this.dataStreams;
+ }
+
+ /**
+ * @see SiddhiStreamSchema
+ * @return Siddhi streamId and stream schema mapping.
+ */
+ public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
+ return this.dataStreamSchemas;
+ }
+
+ /**
+ * @param streamId Siddhi streamId to check.
+ * @return whether the given streamId is defined in current SiddhiCEP environment.
+ */
+ public boolean isStreamDefined(String streamId) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ return dataStreams.containsKey(streamId);
+ }
+
+ /**
+ * @return Registered siddhi extensions.
+ */
+ public Map<String, Class<?>> getExtensions() {
+ return this.extensions;
+ }
+
+ /**
+ * Check whether given streamId has been defined, if not, throw {@link UndefinedStreamException}
+ * @param streamId Siddhi streamId to check.
+ * @throws UndefinedStreamException throws if given streamId is not defined
+ */
+ public void checkStreamDefined(String streamId) throws UndefinedStreamException {
+ Preconditions.checkNotNull(streamId,"streamId");
+ if (!isStreamDefined(streamId)) {
+ throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
+ }
+ }
+
+ /**
+ * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema,
+ * and select as initial source stream to connect to siddhi operator.
+ *
+ * @param streamId Unique siddhi streamId
+ * @param dataStream DataStream to bind to the siddhi stream.
+ * @param fieldNames Siddhi stream schema field names
+ *
+ * @see #registerStream(String, DataStream, String...)
+ * @see #from(String)
+ */
+ public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> dataStream, String... fieldNames) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ Preconditions.checkNotNull(dataStream,"dataStream");
+ Preconditions.checkNotNull(fieldNames,"fieldNames");
+ SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(dataStream.getExecutionEnvironment());
+ return environment.from(streamId, dataStream, fieldNames);
+ }
+
+ /**
+ * Register stream with unique <code>streaId</code>, source <code>dataStream</code> and schema fields,
+ * and select the registered stream as initial stream to connect to Siddhi Runtime.
+ *
+ * @see #registerStream(String, DataStream, String...)
+ * @see #from(String)
+ */
+ public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> dataStream, String... fieldNames) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ Preconditions.checkNotNull(dataStream,"dataStream");
+ Preconditions.checkNotNull(fieldNames,"fieldNames");
+ this.registerStream(streamId, dataStream, fieldNames);
+ return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+ }
+
+ /**
+ * Select stream by streamId as initial stream to connect to Siddhi Runtime.
+ *
+ * @param streamId Siddhi Stream Name
+ * @param <T> Stream Generic Type
+ */
+ public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+ }
+
+ /**
+ * Select one stream and union other streams by streamId to connect to Siddhi Stream Operator.
+ *
+ * @param firstStreamId First siddhi streamId, which should be predefined in SiddhiCEP context.
+ * @param unionStreamIds Other siddhi streamIds to union, which should be predefined in SiddhiCEP context.
+ *
+ * @return The UnionSiddhiStream Builder
+ */
+ public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId, String... unionStreamIds) {
+ Preconditions.checkNotNull(firstStreamId,"firstStreamId");
+ Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
+ return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId, this).union(unionStreamIds);
+ }
+
+ /**
+ * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema.
+ *
+ * @param streamId Unique siddhi streamId
+ * @param dataStream DataStream to bind to the siddhi stream.
+ * @param fieldNames Siddhi stream schema field names
+ */
+ public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ Preconditions.checkNotNull(dataStream,"dataStream");
+ Preconditions.checkNotNull(fieldNames,"fieldNames");
+ if (isStreamDefined(streamId)) {
+ throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
+ }
+ dataStreams.put(streamId, dataStream);
+ SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+ schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+ dataStreamSchemas.put(streamId, schema);
+ }
+
+ /**
+ * @return Current StreamExecutionEnvironment.
+ */
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return executionEnvironment;
+ }
+
+ /**
+ * Register Siddhi CEP Extensions
+ *
+ * @see <a href="https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi">https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi</a>
+ * @param extensionName Unique siddhi extension name
+ * @param extensionClass Siddhi Extension class
+ */
+ public void registerExtension(String extensionName, Class<?> extensionClass) {
+ if (extensions.containsKey(extensionName)) {
+ throw new IllegalArgumentException("Extension named " + extensionName + " already registered");
+ }
+ extensions.put(extensionName, extensionClass);
+ }
+
+ /**
+ * Get registered source DataStream with Siddhi streamId.
+ *
+ * @param streamId Siddhi streamId
+ * @return The source DataStream registered with Siddhi streamId
+ */
+ public <T> DataStream<T> getDataStream(String streamId) {
+ if (this.dataStreams.containsKey(streamId)) {
+ return (DataStream<T>) this.dataStreams.get(streamId);
+ } else {
+ throw new UndefinedStreamException("Undefined stream " + streamId);
+ }
+ }
+
+ /**
+ * Create new SiddhiCEP instance.
+ *
+ * @param streamExecutionEnvironment StreamExecutionEnvironment
+ * @return New SiddhiCEP instance.
+ */
+ public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
+ return new SiddhiCEP(streamExecutionEnvironment);
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
new file mode 100644
index 0000000..43d7436
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Stream API
+ */
+@PublicEvolving
+public abstract class SiddhiStream {
+ private final SiddhiCEP cepEnvironment;
+
+ /**
+ * @param cepEnvironment SiddhiCEP cepEnvironment.
+ */
+ public SiddhiStream(SiddhiCEP cepEnvironment) {
+ Preconditions.checkNotNull(cepEnvironment,"SiddhiCEP cepEnvironment is null");
+ this.cepEnvironment = cepEnvironment;
+ }
+
+ /**
+ * @return current SiddhiCEP cepEnvironment.
+ */
+ protected SiddhiCEP getCepEnvironment() {
+ return this.cepEnvironment;
+ }
+
+ /**
+ * @return Transform SiddhiStream to physical DataStream
+ */
+ protected abstract DataStream<Tuple2<String, Object>> toDataStream();
+
+ /**
+ * Convert DataStream<T> to DataStream<Tuple2<String,T>>.
+ * If it's KeyedStream. pass through original keySelector
+ */
+ protected <T> DataStream<Tuple2<String, Object>> convertDataStream(DataStream<T> dataStream, String streamId) {
+ final String streamIdInClosure = streamId;
+ DataStream<Tuple2<String, Object>> resultStream = dataStream.map(new MapFunction<T, Tuple2<String, Object>>() {
+ @Override
+ public Tuple2<String, Object> map(T value) throws Exception {
+ return Tuple2.of(streamIdInClosure, (Object) value);
+ }
+ });
+ if (dataStream instanceof KeyedStream) {
+ final KeySelector<T, Object> keySelector = ((KeyedStream<T, Object>) dataStream).getKeySelector();
+ final KeySelector<Tuple2<String, Object>, Object> keySelectorInClosure = new KeySelector<Tuple2<String, Object>, Object>() {
+ @Override
+ public Object getKey(Tuple2<String, Object> value) throws Exception {
+ return keySelector.getKey((T) value.f1);
+ }
+ };
+ return resultStream.keyBy(keySelectorInClosure);
+ } else {
+ return resultStream;
+ }
+ }
+
+ /**
+ * ExecutableStream context to define execution logic, i.e. SiddhiCEP execution plan.
+ */
+ public abstract static class ExecutableStream extends SiddhiStream {
+ public ExecutableStream(SiddhiCEP environment) {
+ super(environment);
+ }
+
+ /**
+ * Siddhi Continuous Query Language (CQL)
+ *
+ * @param executionPlan Siddhi SQL-Like execution plan query
+ * @return ExecutionSiddhiStream context
+ */
+ public ExecutionSiddhiStream cql(String executionPlan) {
+ Preconditions.checkNotNull(executionPlan,"executionPlan");
+ return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getCepEnvironment());
+ }
+ }
+
+ /**
+ * Initial Single Siddhi Stream Context
+ */
+ public static class SingleSiddhiStream<T> extends ExecutableStream {
+ private final String streamId;
+
+ public SingleSiddhiStream(String streamId, SiddhiCEP environment) {
+ super(environment);
+ environment.checkStreamDefined(streamId);
+ this.streamId = streamId;
+ }
+
+
+ /**
+ * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and as the first stream of {@link UnionSiddhiStream}
+ *
+ * @param streamId Unique siddhi streamId
+ * @param dataStream DataStream to bind to the siddhi stream.
+ * @param fieldNames Siddhi stream schema field names
+ *
+ * @return {@link UnionSiddhiStream} context
+ */
+ public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
+ getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
+ return union(streamId);
+ }
+
+ /**
+ * @param streamIds Defined siddhi streamIds to union
+ * @return {@link UnionSiddhiStream} context
+ */
+ public UnionSiddhiStream<T> union(String... streamIds) {
+ Preconditions.checkNotNull(streamIds,"streamIds");
+ return new UnionSiddhiStream<T>(this.streamId, Arrays.asList(streamIds), this.getCepEnvironment());
+ }
+
+ @Override
+ protected DataStream<Tuple2<String, Object>> toDataStream() {
+ return convertDataStream(getCepEnvironment().getDataStream(this.streamId), this.streamId);
+ }
+ }
+
+ public static class UnionSiddhiStream<T> extends ExecutableStream {
+ private String firstStreamId;
+ private List<String> unionStreamIds;
+
+ public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, SiddhiCEP environment) {
+ super(environment);
+ Preconditions.checkNotNull(firstStreamId,"firstStreamId");
+ Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
+ environment.checkStreamDefined(firstStreamId);
+ for (String unionStreamId : unionStreamIds) {
+ environment.checkStreamDefined(unionStreamId);
+ }
+ this.firstStreamId = firstStreamId;
+ this.unionStreamIds = unionStreamIds;
+ }
+
+ /**
+ * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and continue to union it with current stream.
+ *
+ * @param streamId Unique siddhi streamId
+ * @param dataStream DataStream to bind to the siddhi stream.
+ * @param fieldNames Siddhi stream schema field names
+ *
+ * @return {@link UnionSiddhiStream} context
+ */
+ public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
+ Preconditions.checkNotNull(streamId,"streamId");
+ Preconditions.checkNotNull(dataStream,"dataStream");
+ Preconditions.checkNotNull(fieldNames,"fieldNames");
+ getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
+ return union(streamId);
+ }
+
+ /**
+ * @param streamId another defined streamId to union with.
+ * @return {@link UnionSiddhiStream} context
+ */
+ public UnionSiddhiStream<T> union(String... streamId) {
+ List<String> newUnionStreamIds = new LinkedList<>();
+ newUnionStreamIds.addAll(unionStreamIds);
+ newUnionStreamIds.addAll(Arrays.asList(streamId));
+ return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getCepEnvironment());
+ }
+
+ @Override
+ protected DataStream<Tuple2<String, Object>> toDataStream() {
+ final String localFirstStreamId = firstStreamId;
+ final List<String> localUnionStreamIds = this.unionStreamIds;
+ DataStream<Tuple2<String, Object>> dataStream = convertDataStream(getCepEnvironment().<T>getDataStream(localFirstStreamId), this.firstStreamId);
+ for (String unionStreamId : localUnionStreamIds) {
+ dataStream = dataStream.union(convertDataStream(getCepEnvironment().<T>getDataStream(unionStreamId), unionStreamId));
+ }
+ return dataStream;
+ }
+ }
+
+ public static class ExecutionSiddhiStream {
+ private final DataStream<Tuple2<String, Object>> dataStream;
+ private final SiddhiCEP environment;
+ private final String executionPlan;
+
+ public ExecutionSiddhiStream(DataStream<Tuple2<String, Object>> dataStream, String executionPlan, SiddhiCEP environment) {
+ this.executionPlan = executionPlan;
+ this.dataStream = dataStream;
+ this.environment = environment;
+ }
+
+ /**
+ * @param outStreamId The <code>streamId</code> to return as data stream.
+ * @param <T> Type information should match with stream definition.
+ * During execution phase, it will automatically build type information based on stream definition.
+ * @return Return output stream as Tuple
+ * @see SiddhiTypeFactory
+ */
+ public <T extends Tuple> DataStream<T> returns(String outStreamId) {
+ SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
+ siddhiContext.setExecutionPlan(executionPlan);
+ siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
+ siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
+ siddhiContext.setOutputStreamId(outStreamId);
+ siddhiContext.setExtensions(environment.getExtensions());
+ siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
+ TypeInformation<T> typeInformation =
+ SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId);
+ siddhiContext.setOutputStreamType(typeInformation);
+ return returnsInternal(siddhiContext);
+ }
+
+ /**
+ * @return Return output stream as <code>DataStream<Map<String,Object>></code>,
+ * out type is <code>LinkedHashMap<String,Object></code> and guarantee field order
+ * as defined in siddhi execution plan
+ * @see java.util.LinkedHashMap
+ */
+ public DataStream<Map<String, Object>> returnAsMap(String outStreamId) {
+ return this.returnsInternal(outStreamId, SiddhiTypeFactory.getMapTypeInformation());
+ }
+
+ /**
+ * @param outStreamId OutStreamId
+ * @param outType Output type class
+ * @param <T> Output type
+ * @return Return output stream as POJO class.
+ */
+ public <T> DataStream<T> returns(String outStreamId, Class<T> outType) {
+ TypeInformation<T> typeInformation = TypeExtractor.getForClass(outType);
+ return returnsInternal(outStreamId, typeInformation);
+ }
+
+ private <T> DataStream<T> returnsInternal(String outStreamId, TypeInformation<T> typeInformation) {
+ SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
+ siddhiContext.setExecutionPlan(executionPlan);
+ siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
+ siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
+ siddhiContext.setOutputStreamId(outStreamId);
+ siddhiContext.setOutputStreamType(typeInformation);
+ siddhiContext.setExtensions(environment.getExtensions());
+ siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
+ return returnsInternal(siddhiContext);
+ }
+
+ private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext) {
+ return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
new file mode 100644
index 0000000..f65cc81
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.exception;
+
+public class DuplicatedStreamException extends RuntimeException {
+ public DuplicatedStreamException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
new file mode 100644
index 0000000..26254c2
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.exception;
+
+public class UndefinedStreamException extends RuntimeException {
+ public UndefinedStreamException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
new file mode 100755
index 0000000..8cb6d67
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+/**
+ * <h1>Siddhi Runtime Operator</h1>
+ *
+ * A flink Stream Operator to integrate with native siddhi execution runtime, extension and type schema mechanism/
+ *
+ * <ul>
+ * <li>
+ * Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle.
+ * </li>
+ * <li>
+ * Connect Flink DataStreams with predefined Siddhi Stream according to unique streamId
+ * </li>
+ * <li>
+ * Convert native {@link StreamRecord} to Siddhi {@link org.wso2.siddhi.core.event.Event} according to {@link StreamSchema}, and send to Siddhi Runtime.
+ * </li>
+ * <li>
+ * Listen output callback event and convert as expected output type according to output {@link org.apache.flink.api.common.typeinfo.TypeInformation}, then output as typed DataStream.
+ * </li>
+ * </li>
+ * <li>
+ * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
+ * </li>
+ * <li>
+ * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
+ * </li>
+ * </ul>
+ *
+ * @param <IN> Input Element Type
+ * @param <OUT> Output Element Type
+ */
+public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT>
+ implements OneInputStreamOperator<IN, OUT> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
+ private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+ private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
+ private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
+
+ private final SiddhiOperatorContext siddhiPlan;
+ private final String executionExpression;
+ private final boolean isProcessingTime;
+ private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers;
+
+ private transient SiddhiManager siddhiManager;
+ private transient SiddhiAppRuntime siddhiRuntime;
+ private transient Map<String, InputHandler> inputStreamHandlers;
+
+ // queue to buffer out of order stream records
+ private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
+
+ private transient ListState<byte[]> siddhiRuntimeState;
+ private transient ListState<byte[]> queuedRecordsState;
+
+ /**
+ * @param siddhiPlan Siddhi CEP Execution Plan
+ */
+ public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+ validate(siddhiPlan);
+ this.executionExpression = siddhiPlan.getFinalExecutionPlan();
+ this.siddhiPlan = siddhiPlan;
+ this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+ this.streamRecordSerializers = new HashMap<>();
+
+ registerStreamRecordSerializers();
+ }
+
+ /**
+ * Register StreamRecordSerializer based on {@link StreamSchema}
+ */
+ private void registerStreamRecordSerializers() {
+ for (String streamId : this.siddhiPlan.getInputStreams()) {
+ streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig()));
+ }
+ }
+
+ protected abstract StreamElementSerializer<IN> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig);
+
+ protected StreamElementSerializer<IN> getStreamRecordSerializer(String streamId) {
+ if (streamRecordSerializers.containsKey(streamId)) {
+ return streamRecordSerializers.get(streamId);
+ } else {
+ throw new UndefinedStreamException("Stream " + streamId + " not defined");
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ String streamId = getStreamId(element.getValue());
+ StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);
+
+ if (isProcessingTime) {
+ processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
+ this.checkpointSiddhiRuntimeState();
+ } else {
+ PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+ // event time processing
+ // we have to buffer the elements until we receive the proper watermark
+ if (getExecutionConfig().isObjectReuseEnabled()) {
+ // copy the StreamRecord so that it cannot be changed
+ priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
+ } else {
+ priorityQueue.offer(element);
+ }
+ this.checkpointRecordQueueState();
+ }
+ }
+
+ protected abstract void processEvent(String streamId, StreamSchema<IN> schema, IN value, long timestamp) throws Exception;
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+ StreamRecord<IN> streamRecord = priorityQueue.poll();
+ String streamId = getStreamId(streamRecord.getValue());
+ long timestamp = streamRecord.getTimestamp();
+ StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);
+ processEvent(streamId, schema, streamRecord.getValue(), timestamp);
+ }
+ output.emitWatermark(mark);
+ }
+
+ public abstract String getStreamId(IN record);
+
+ public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
+ return priorityQueue;
+ }
+
+ protected SiddhiAppRuntime getSiddhiRuntime() {
+ return this.siddhiRuntime;
+ }
+
+ public InputHandler getSiddhiInputHandler(String streamId) {
+ return inputStreamHandlers.get(streamId);
+ }
+
+ protected SiddhiOperatorContext getSiddhiPlan() {
+ return this.siddhiPlan;
+ }
+
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+ super.setup(containingTask, config, output);
+ if (priorityQueue == null) {
+ priorityQueue = new PriorityQueue<>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
+ }
+ startSiddhiRuntime();
+ }
+
+ /**
+ * Send input data to siddhi runtime
+ */
+ protected void send(String streamId, Object[] data, long timestamp) throws InterruptedException {
+ this.getSiddhiInputHandler(streamId).send(timestamp, data);
+ }
+
+ /**
+ * Validate execution plan during building DAG before submitting to execution environment and fail-fast.
+ */
+ private static void validate(final SiddhiOperatorContext siddhiPlan) {
+ SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager();
+ try {
+ siddhiManager.validateSiddhiApp(siddhiPlan.getFinalExecutionPlan());
+ } finally {
+ siddhiManager.shutdown();
+ }
+ }
+
+ /**
+ * Create and start execution runtime
+ */
+ private void startSiddhiRuntime() {
+ if (this.siddhiRuntime == null) {
+ this.siddhiManager = this.siddhiPlan.createSiddhiManager();
+ for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
+ this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
+ }
+ this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression);
+ this.siddhiRuntime.start();
+ registerInputAndOutput(this.siddhiRuntime);
+ LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
+ } else {
+ throw new IllegalStateException("Siddhi has already been initialized");
+ }
+ }
+
+
+ private void shutdownSiddhiRuntime() {
+ if (this.siddhiRuntime != null) {
+ this.siddhiRuntime.shutdown();
+ LOGGER.info("Siddhi {} shutdown", this.siddhiRuntime.getName());
+ this.siddhiRuntime = null;
+ this.siddhiManager.shutdown();
+ this.siddhiManager = null;
+ this.inputStreamHandlers = null;
+ } else {
+ throw new IllegalStateException("Siddhi has already shutdown");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void registerInputAndOutput(SiddhiAppRuntime runtime) {
+ AbstractDefinition definition = this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId());
+ runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler<>(this.siddhiPlan.getOutputStreamType(), definition, this.output));
+ inputStreamHandlers = new HashMap<>();
+ for (String inputStreamId : this.siddhiPlan.getInputStreams()) {
+ inputStreamHandlers.put(inputStreamId, runtime.getInputHandler(inputStreamId));
+ }
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ shutdownSiddhiRuntime();
+ this.siddhiRuntimeState.clear();
+ super.dispose();
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ checkpointSiddhiRuntimeState();
+ checkpointRecordQueueState();
+ }
+
+ private void restoreState() throws Exception {
+ LOGGER.info("Restore siddhi state");
+ final Iterator<byte[]> siddhiState = siddhiRuntimeState.get().iterator();
+ if (siddhiState.hasNext()) {
+ this.siddhiRuntime.restore(siddhiState.next());
+ }
+
+ LOGGER.info("Restore queued records state");
+ final Iterator<byte[]> queueState = queuedRecordsState.get().iterator();
+ if (queueState.hasNext()) {
+ final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(queueState.next());
+ final DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(byteArrayInputStream);
+ try {
+ this.priorityQueue = restoreQueuerState(dataInputView);
+ } finally {
+ dataInputView.close();
+ byteArrayInputStream.close();
+ }
+ }
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+ if (siddhiRuntimeState == null) {
+ siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME,
+ new BytePrimitiveArraySerializer()));
+ }
+ if (queuedRecordsState == null) {
+ queuedRecordsState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer()));
+ }
+ if (context.isRestored()) {
+ restoreState();
+ }
+ }
+
+
+ private void checkpointSiddhiRuntimeState() throws Exception {
+ this.siddhiRuntimeState.clear();
+ this.siddhiRuntimeState.add(this.siddhiRuntime.snapshot());
+ this.queuedRecordsState.clear();
+ }
+
+ private void checkpointRecordQueueState() throws Exception {
+ final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ final DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper(byteArrayOutputStream);
+ try {
+ snapshotQueueState(this.priorityQueue, dataOutputView);
+ this.queuedRecordsState.clear();
+ this.queuedRecordsState.add(byteArrayOutputStream.toByteArray());
+ } finally {
+ dataOutputView.close();
+ byteArrayOutputStream.close();
+ }
+ }
+
+ protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> queue, DataOutputView dataOutputView) throws IOException;
+
+ protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView dataInputView) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
new file mode 100644
index 0000000..f760938
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.core.SiddhiManager;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SiddhiCEP Operator Context Metadata including input/output stream (streamId, TypeInformation) as well execution plan query,
+ * and execution environment context like TimeCharacteristic and ExecutionConfig.
+ */
+public class SiddhiOperatorContext implements Serializable {
+ private ExecutionConfig executionConfig;
+ private Map<String, SiddhiStreamSchema<?>> inputStreamSchemas;
+ private final Map<String, Class<?>> siddhiExtensions;
+ private String outputStreamId;
+ private TypeInformation outputStreamType;
+ private TimeCharacteristic timeCharacteristic;
+ private String name;
+ private String executionPlan;
+
+ public SiddhiOperatorContext() {
+ inputStreamSchemas = new HashMap<>();
+ siddhiExtensions = new HashMap<>();
+ }
+
+ /**
+ * @param extensions siddhi extensions to register
+ */
+ public void setExtensions(Map<String, Class<?>> extensions) {
+ Preconditions.checkNotNull(extensions,"extensions");
+ siddhiExtensions.putAll(extensions);
+ }
+
+ /**
+ * @return registered siddhi extensions
+ */
+ public Map<String, Class<?>> getExtensions() {
+ return siddhiExtensions;
+ }
+
+ /**
+ * @return Siddhi Stream Operator Name in format of "Siddhi: execution query ... (query length)"
+ */
+ public String getName() {
+ if (this.name == null) {
+ if (executionPlan.length() > 100) {
+ return String.format("Siddhi: %s ... (%s)", executionPlan.substring(0, 100), executionPlan.length() - 100);
+ } else {
+ return String.format("Siddhi: %s", executionPlan);
+ }
+ } else {
+ return this.name;
+ }
+ }
+
+ /**
+ * @return Source siddhi stream IDs
+ */
+ public List<String> getInputStreams() {
+ Object[] keys = this.inputStreamSchemas.keySet().toArray();
+ List<String> result = new ArrayList<>(keys.length);
+ for (Object key : keys) {
+ result.add((String) key);
+ }
+ return result;
+ }
+
+ /**
+ * @return Siddhi CEP cql-like execution plan
+ */
+ public String getExecutionPlan() {
+ return executionPlan;
+ }
+
+ /**
+ * Stream definition + execution expression
+ */
+ public String getFinalExecutionPlan() {
+ Preconditions.checkNotNull(executionPlan, "Execution plan is not set");
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, SiddhiStreamSchema<?>> entry : inputStreamSchemas.entrySet()) {
+ sb.append(entry.getValue().getStreamDefinitionExpression(entry.getKey()));
+ }
+ sb.append(this.getExecutionPlan());
+ return sb.toString();
+ }
+
+ /**
+ * @return Siddhi Stream Operator output type information
+ */
+ public TypeInformation getOutputStreamType() {
+ return outputStreamType;
+ }
+
+ /**
+ * @return Siddhi output streamId for callback
+ */
+ public String getOutputStreamId() {
+ return outputStreamId;
+ }
+
+ /**
+ * @param inputStreamId Siddhi streamId
+ * @return StreamSchema for given siddhi streamId
+ *
+ * @throws UndefinedStreamException throws if stream is not defined
+ */
+ @SuppressWarnings("unchecked")
+ public <IN> StreamSchema<IN> getInputStreamSchema(String inputStreamId) {
+ Preconditions.checkNotNull(inputStreamId,"inputStreamId");
+
+ if (!inputStreamSchemas.containsKey(inputStreamId)) {
+ throw new UndefinedStreamException("Input stream: " + inputStreamId + " is not found");
+ }
+ return (StreamSchema<IN>) inputStreamSchemas.get(inputStreamId);
+ }
+
+ /**
+ * @param outputStreamId Siddhi output streamId, which must exist in siddhi execution plan
+ */
+ public void setOutputStreamId(String outputStreamId) {
+ Preconditions.checkNotNull(outputStreamId,"outputStreamId");
+ this.outputStreamId = outputStreamId;
+ }
+
+ /**
+ * @param outputStreamType Output stream TypeInformation
+ */
+ public void setOutputStreamType(TypeInformation outputStreamType) {
+ Preconditions.checkNotNull(outputStreamType,"outputStreamType");
+ this.outputStreamType = outputStreamType;
+ }
+
+ /**
+ * @return Returns execution environment TimeCharacteristic
+ */
+ public TimeCharacteristic getTimeCharacteristic() {
+ return timeCharacteristic;
+ }
+
+ public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+ Preconditions.checkNotNull(timeCharacteristic,"timeCharacteristic");
+ this.timeCharacteristic = timeCharacteristic;
+ }
+
+ /**
+ * @param executionPlan Siddhi SQL-Like exeuction plan query
+ */
+ public void setExecutionPlan(String executionPlan) {
+ Preconditions.checkNotNull(executionPlan,"executionPlan");
+ this.executionPlan = executionPlan;
+ }
+
+ /**
+ * @return Returns input stream ID and schema mapping
+ */
+ public Map<String, SiddhiStreamSchema<?>> getInputStreamSchemas() {
+ return inputStreamSchemas;
+ }
+
+ /**
+ * @param inputStreamSchemas input stream ID and schema mapping
+ */
+ public void setInputStreamSchemas(Map<String, SiddhiStreamSchema<?>> inputStreamSchemas) {
+ Preconditions.checkNotNull(inputStreamSchemas,"inputStreamSchemas");
+ this.inputStreamSchemas = inputStreamSchemas;
+ }
+
+ public void setName(String name) {
+ Preconditions.checkNotNull(name,"name");
+ this.name = name;
+ }
+
+ /**
+ * @return Created new SiddhiManager instance with registered siddhi extensions
+ */
+ public SiddhiManager createSiddhiManager() {
+ SiddhiManager siddhiManager = new SiddhiManager();
+ for (Map.Entry<String, Class<?>> entry : getExtensions().entrySet()) {
+ siddhiManager.setExtension(entry.getKey(), entry.getValue());
+ }
+ return siddhiManager;
+ }
+
+ /**
+ * @return StreamExecutionEnvironment ExecutionConfig
+ */
+ public ExecutionConfig getExecutionConfig() {
+ return executionConfig;
+ }
+
+ /**
+ * @param executionConfig StreamExecutionEnvironment ExecutionConfig
+ */
+ public void setExecutionConfig(ExecutionConfig executionConfig) {
+ Preconditions.checkNotNull(executionConfig,"executionConfig");
+ this.executionConfig = executionConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
new file mode 100755
index 0000000..5c54ad8
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Wrap input event in generic type of <code>IN</code> as Tuple2<String,IN>
+ */
+public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
+
+ public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan) {
+ super(siddhiPlan);
+ }
+
+ @Override
+ protected StreamElementSerializer<Tuple2<String, IN>> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) {
+ TypeInformation<Tuple2<String, IN>> tuple2TypeInformation = SiddhiTypeFactory.getStreamTupleTypeInformation((TypeInformation<IN>) streamSchema.getTypeInfo());
+ return new StreamElementSerializer<>(tuple2TypeInformation.createSerializer(executionConfig));
+ }
+
+ @Override
+ protected void processEvent(String streamId, StreamSchema<Tuple2<String, IN>> schema, Tuple2<String, IN> value, long timestamp) throws InterruptedException {
+ send(value.f0, getSiddhiPlan().getInputStreamSchema(value.f0).getStreamSerializer().getRow(value.f1), timestamp);
+ }
+
+ @Override
+ public String getStreamId(Tuple2<String, IN> record) {
+ return record.f0;
+ }
+
+ @Override
+ protected void snapshotQueueState(PriorityQueue<StreamRecord<Tuple2<String, IN>>> queue, DataOutputView dataOutputView) throws IOException {
+ dataOutputView.writeInt(queue.size());
+ for (StreamRecord<Tuple2<String, IN>> record : queue) {
+ String streamId = record.getValue().f0;
+ dataOutputView.writeUTF(streamId);
+ this.getStreamRecordSerializer(streamId).serialize(record, dataOutputView);
+ }
+ }
+
+ @Override
+ protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
+ int sizeOfQueue = dataInputView.readInt();
+ PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(sizeOfQueue);
+ for (int i = 0; i < sizeOfQueue; i++) {
+ String streamId = dataInputView.readUTF();
+ StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView);
+ priorityQueue.offer(streamElement.<Tuple2<String, IN>>asRecord());
+ }
+ return priorityQueue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
new file mode 100755
index 0000000..7af37ce
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
+ * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition}
+ */
+public class StreamInMemOutputHandler<R> extends StreamCallback {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamInMemOutputHandler.class);
+
+ private final AbstractDefinition definition;
+ private final TypeInformation<R> typeInfo;
+ private final ObjectMapper objectMapper;
+
+
+ private final LinkedList<StreamRecord<R>> collectedRecords;
+
+ public StreamInMemOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition) {
+ this.typeInfo = typeInfo;
+ this.definition = definition;
+ this.objectMapper = new ObjectMapper();
+ this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ this.collectedRecords = new LinkedList<>();
+ }
+
+ @Override
+ public void receive(Event[] events) {
+ for (Event event : events) {
+ if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
+ collectedRecords.add(new StreamRecord<R>((R) toMap(event), event.getTimestamp()));
+ } else if (typeInfo.isTupleType()) {
+ Tuple tuple = this.toTuple(event);
+ collectedRecords.add(new StreamRecord<R>((R) tuple, event.getTimestamp()));
+ } else if (typeInfo instanceof PojoTypeInfo) {
+ R obj;
+ try {
+ obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
+ } catch (IllegalArgumentException ex) {
+ LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
+ throw ex;
+ }
+ collectedRecords.add(new StreamRecord<R>(obj, event.getTimestamp()));
+ } else {
+ throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
+ }
+ }
+ }
+
+
+ @Override
+ public synchronized void stopProcessing() {
+ super.stopProcessing();
+ this.collectedRecords.clear();
+ }
+
+ private Map<String, Object> toMap(Event event) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
+ map.put(definition.getAttributeNameArray()[i], event.getData(i));
+ }
+ return map;
+ }
+
+ private <T extends Tuple> T toTuple(Event event) {
+ return SiddhiTupleFactory.newTuple(event.getData());
+ }
+
+ public LinkedList<StreamRecord<R>> getCollectedRecords() {
+ return collectedRecords;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
new file mode 100755
index 0000000..8840dac
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
+ * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition}
+ */
+public class StreamOutputHandler<R> extends StreamCallback {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamOutputHandler.class);
+
+ private final AbstractDefinition definition;
+ private final Output<StreamRecord<R>> output;
+ private final TypeInformation<R> typeInfo;
+ private final ObjectMapper objectMapper;
+
+ public StreamOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition, Output<StreamRecord<R>> output) {
+ this.typeInfo = typeInfo;
+ this.definition = definition;
+ this.output = output;
+ this.objectMapper = new ObjectMapper();
+ this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ @Override
+ public void receive(Event[] events) {
+ StreamRecord<R> reusableRecord = new StreamRecord<>(null, 0L);
+ for (Event event : events) {
+ if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
+ reusableRecord.replace(toMap(event), event.getTimestamp());
+ output.collect(reusableRecord);
+ } else if (typeInfo.isTupleType()) {
+ Tuple tuple = this.toTuple(event);
+ reusableRecord.replace(tuple, event.getTimestamp());
+ output.collect(reusableRecord);
+ } else if (typeInfo instanceof PojoTypeInfo) {
+ R obj;
+ try {
+ obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
+ } catch (IllegalArgumentException ex) {
+ LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
+ throw ex;
+ }
+ reusableRecord.replace(obj, event.getTimestamp());
+ output.collect(reusableRecord);
+ } else {
+ throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
+ }
+ }
+ }
+
+
+ @Override
+ public synchronized void stopProcessing() {
+ super.stopProcessing();
+ }
+
+ private Map<String, Object> toMap(Event event) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
+ map.put(definition.getAttributeNameArray()[i], event.getData(i));
+ }
+ return map;
+ }
+
+ private <T extends Tuple> T toTuple(Event event) {
+ return SiddhiTupleFactory.newTuple(event.getData());
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
new file mode 100644
index 0000000..049681c
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Stream Record Timestamp Comparator
+ */
+public class StreamRecordComparator<IN> implements Comparator<StreamRecord<IN>>, Serializable {
+ private static final long serialVersionUID = 1581054988433915305L;
+
+ @Override
+ public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+ if (o1.getTimestamp() < o2.getTimestamp()) {
+ return -1;
+ } else if (o1.getTimestamp() > o2.getTimestamp()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
new file mode 100644
index 0000000..6b1ceae
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <h1> Features </h1>
+ * <ul>
+ * <li>
+ * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like
+ * <ul>
+ * <li>Filter</li>
+ * <li>Join</li>
+ * <li>Aggregation</li>
+ * <li>Group by</li>
+ * <li>Having</li>
+ * <li>Window</li>
+ * <li>Conditions and Expressions</li>
+ * <li>Pattern processing</li>
+ * <li>Sequence processing</li>
+ * <li>Event Tables</li>
+ * <li>...</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`)
+ * <ul>
+ * <li>Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.</li>
+ * <li>Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan</li>
+ * <li>Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`)
+ * </li>
+ * <li>
+ * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`)
+ * </li>
+ * </ul>
+ * <p/>
+ * <h1>Example</h1>
+ * <pre>
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+ *
+ * cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
+ *
+ * cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
+ * cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");
+ *
+ * DataStream<Tuple4<Integer,String,Integer,String>> output = cep
+ * .from("inputStream1").union("inputStream2")
+ * .cql(
+ * "from every s1 = inputStream1[id == 2] "
+ * + " -> s2 = inputStream2[id == 3] "
+ * + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+ * + "insert into outputStream"
+ * )
+ * .returns("outputStream");
+ *
+ * env.execute();
+ * </pre>
+ *
+ * @see <a href="https://github.com/wso2/siddhi">https://github.com/wso2/siddhi</a>
+ */
+package org.apache.flink.streaming.siddhi;
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
new file mode 100644
index 0000000..2a3a04c
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Siddhi specific Stream Schema.
+ *
+ * @param <T> Siddhi stream element type
+ */
+public class SiddhiStreamSchema<T> extends StreamSchema<T> {
+ private static final String DEFINE_STREAM_TEMPLATE = "define stream %s (%s);";
+
+ public SiddhiStreamSchema(TypeInformation<T> typeInfo, String... fieldNames) {
+ super(typeInfo, fieldNames);
+ }
+
+ public SiddhiStreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+ super(typeInfo, fieldIndexes, fieldNames);
+ }
+
+ public StreamDefinition getStreamDefinition(String streamId) {
+ StreamDefinition streamDefinition = StreamDefinition.id(streamId);
+ for (int i = 0; i < getFieldNames().length; i++) {
+ streamDefinition.attribute(getFieldNames()[i], SiddhiTypeFactory.getAttributeType(getFieldTypes()[i]));
+ }
+ return streamDefinition;
+ }
+
+ public String getStreamDefinitionExpression(StreamDefinition streamDefinition) {
+ List<String> columns = new ArrayList<>();
+ Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+ for (Attribute attribute : streamDefinition.getAttributeList()) {
+ columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
+ }
+ return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
+ }
+
+ public String getStreamDefinitionExpression(String streamId) {
+ StreamDefinition streamDefinition = getStreamDefinition(streamId);
+ List<String> columns = new ArrayList<>();
+ Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+ for (Attribute attribute : streamDefinition.getAttributeList()) {
+ columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
+ }
+ return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
+ }
+}