You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/11/26 15:13:27 UTC

[2/3] bahir-flink git commit: [BAHIR-144] Add flink-library-siddhi

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java
new file mode 100644
index 0000000..c851631
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Generic Field-based Stream Schema
+ *
+ * @param <T> Stream element type
+ */
+public class StreamSchema<T> implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSchema.class);
+    private final TypeInformation<T> typeInfo;
+    private final int[] fieldIndexes;
+    private final String[] fieldNames;
+    private TypeInformation[] fieldTypes;
+    private final StreamSerializer<T> streamSerializer;
+    private TypeSerializer<T> typeSerializer;
+
+    public StreamSchema(TypeInformation<T> typeInfo, String... fieldNames) {
+        Preconditions.checkNotNull(fieldNames, "Field name is required");
+        this.typeInfo = typeInfo;
+        this.fieldNames = fieldNames;
+        this.fieldIndexes = getFieldIndexes(typeInfo, fieldNames);
+        this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames);
+        this.streamSerializer = new StreamSerializer<>(this);
+    }
+
+    public StreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+        this.typeInfo = typeInfo;
+        this.fieldIndexes = fieldIndexes;
+        this.fieldNames = fieldNames;
+        this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames);
+        this.streamSerializer = new StreamSerializer<>(this);
+    }
+
+    public boolean isAtomicType() {
+        return typeInfo instanceof AtomicType;
+    }
+
+    public boolean isTupleType() {
+        return typeInfo instanceof TupleTypeInfo;
+    }
+
+    public boolean isPojoType() {
+        return typeInfo instanceof PojoTypeInfo;
+    }
+
+    public boolean isCaseClassType() {
+        return typeInfo instanceof CaseClassTypeInfo;
+    }
+
+    public boolean isCompositeType() {
+        return typeInfo instanceof CompositeType;
+    }
+
+    private <E> int[] getFieldIndexes(TypeInformation<E> typeInfo, String... fieldNames) {
+        int[] result;
+        if (isAtomicType()) {
+            result = new int[]{0};
+        } else if (isTupleType()) {
+            result = new int[fieldNames.length];
+            for (int i = 0; i < fieldNames.length; i++) {
+                result[i] = i;
+            }
+        } else if (isPojoType()) {
+            result = new int[fieldNames.length];
+            for (int i = 0; i < fieldNames.length; i++) {
+                int index = ((PojoTypeInfo) typeInfo).getFieldIndex(fieldNames[i]);
+                if (index < 0) {
+                    throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo);
+                }
+                result[i] = index;
+            }
+        } else if (isCaseClassType()) {
+            result = new int[fieldNames.length];
+            for (int i = 0; i < fieldNames.length; i++) {
+                int index = ((CaseClassTypeInfo) typeInfo).getFieldIndex(fieldNames[i]);
+                if (index < 0) {
+                    throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo);
+                }
+                result[i] = index;
+            }
+        } else {
+            throw new IllegalArgumentException("Failed to get field index from " + typeInfo);
+        }
+        return result;
+    }
+
+
+    private <E> TypeInformation[] getFieldTypes(TypeInformation<E> typeInfo, int[] fieldIndexes, String[] fieldNames) {
+        TypeInformation[] fieldTypes;
+        if (isCompositeType()) {
+            CompositeType cType = (CompositeType) typeInfo;
+            if (fieldNames.length != cType.getArity()) {
+                // throw new IllegalArgumentException("Arity of type (" + cType.getFieldNames().length+ ") " +
+                // "not equal to number of field names " + fieldNames.length + ".");
+                LOGGER.warn("Arity of type (" + cType.getFieldNames().length + ") " +
+                    "not equal to number of field names " + fieldNames.length + ".");
+            }
+            fieldTypes = new TypeInformation[fieldIndexes.length];
+            for (int i = 0; i < fieldIndexes.length; i++) {
+                fieldTypes[i] = cType.getTypeAt(fieldIndexes[i]);
+            }
+        } else if (isAtomicType()) {
+            if (fieldIndexes.length != 1 || fieldIndexes[0] != 0) {
+                throw new IllegalArgumentException(
+                    "Non-composite input type may have only a single field and its index must be 0.");
+            }
+            fieldTypes = new TypeInformation[]{typeInfo};
+        } else {
+            throw new IllegalArgumentException(
+                "Illegal input type info"
+            );
+        }
+        return fieldTypes;
+    }
+
+    public TypeInformation<T> getTypeInfo() {
+        return typeInfo;
+    }
+
+    public int[] getFieldIndexes() {
+        return fieldIndexes;
+    }
+
+    public String[] getFieldNames() {
+        return fieldNames;
+    }
+
+    public TypeInformation[] getFieldTypes() {
+        return fieldTypes;
+    }
+
+    public StreamSerializer<T> getStreamSerializer() {
+        return streamSerializer;
+    }
+
+    public TypeSerializer<T> getTypeSerializer() {
+        return typeSerializer;
+    }
+
+    public void setTypeSerializer(TypeSerializer<T> typeSerializer) {
+        this.typeSerializer = typeSerializer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java
new file mode 100644
index 0000000..760afbe
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+
+/**
+ * Stream Serialization and Field Extraction Methods.
+ */
+public class StreamSerializer<T> implements Serializable {
+    private final StreamSchema<T> schema;
+
+    public StreamSerializer(StreamSchema<T> schema) {
+        this.schema = schema;
+    }
+
+    public Object[] getRow(T input) {
+        Preconditions.checkArgument(input.getClass() == schema.getTypeInfo().getTypeClass(),
+            "Invalid input type: " + input + ", expected: " + schema.getTypeInfo());
+
+        Object[] data;
+        if (schema.isAtomicType()) {
+            data = new Object[]{input};
+        } else if (schema.isTupleType()) {
+            Tuple tuple = (Tuple) input;
+            data = new Object[schema.getFieldIndexes().length];
+            for (int i = 0; i < schema.getFieldIndexes().length; i++) {
+                data[i] = tuple.getField(schema.getFieldIndexes()[i]);
+            }
+        } else if (schema.isPojoType() || schema.isCaseClassType()) {
+            data = new Object[schema.getFieldIndexes().length];
+            for (int i = 0; i < schema.getFieldNames().length; i++) {
+                data[i] = getFieldValue(schema.getFieldNames()[i], input);
+            }
+        } else {
+            throw new IllegalArgumentException("Failed to get field values from " + schema.getTypeInfo());
+        }
+        return data;
+    }
+
+    private Object getFieldValue(String fieldName, T input) {
+        // TODO: Cache Field Accessor
+        Field field = TypeExtractor.getDeclaredField(schema.getTypeInfo().getTypeClass(), fieldName);
+        if (field == null) {
+            throw new IllegalArgumentException(fieldName + " is not found in " + schema.getTypeInfo());
+        }
+        if (!field.isAccessible()) {
+            field.setAccessible(true);
+        }
+        try {
+            return field.get(input);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
new file mode 100644
index 0000000..20ca535
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Convert SiddhiCEPExecutionPlan to SiddhiCEP Operator and build output DataStream
+ */
+public class SiddhiStreamFactory {
+    @SuppressWarnings("unchecked")
+    public static <OUT> DataStream<OUT> createDataStream(SiddhiOperatorContext context, DataStream<Tuple2<String, Object>> namedStream) {
+        return namedStream.transform(context.getName(), context.getOutputStreamType(), new SiddhiStreamOperator(context));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
new file mode 100644
index 0000000..88c15eb
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Siddhi Tuple Utility methods
+ */
+public class SiddhiTupleFactory {
+    /**
+     * Convert object array to type of Tuple{N} where N is between 0 to 25.
+     *
+     * @throws IllegalArgumentException if rows's length > 25
+     */
+    public static <T extends Tuple> T newTuple(Object[] row) {
+        Preconditions.checkNotNull(row, "Tuple row is null");
+        switch (row.length) {
+            case 0:
+                return setTupleValue(new Tuple0(), row);
+            case 1:
+                return setTupleValue(new Tuple1(), row);
+            case 2:
+                return setTupleValue(new Tuple2(), row);
+            case 3:
+                return setTupleValue(new Tuple3(), row);
+            case 4:
+                return setTupleValue(new Tuple4(), row);
+            case 5:
+                return setTupleValue(new Tuple5(), row);
+            case 6:
+                return setTupleValue(new Tuple6(), row);
+            case 7:
+                return setTupleValue(new Tuple7(), row);
+            case 8:
+                return setTupleValue(new Tuple8(), row);
+            case 9:
+                return setTupleValue(new Tuple9(), row);
+            case 10:
+                return setTupleValue(new Tuple10(), row);
+            case 11:
+                return setTupleValue(new Tuple11(), row);
+            case 12:
+                return setTupleValue(new Tuple12(), row);
+            case 13:
+                return setTupleValue(new Tuple13(), row);
+            case 14:
+                return setTupleValue(new Tuple14(), row);
+            case 15:
+                return setTupleValue(new Tuple15(), row);
+            case 16:
+                return setTupleValue(new Tuple16(), row);
+            case 17:
+                return setTupleValue(new Tuple17(), row);
+            case 18:
+                return setTupleValue(new Tuple18(), row);
+            case 19:
+                return setTupleValue(new Tuple19(), row);
+            case 20:
+                return setTupleValue(new Tuple20(), row);
+            case 21:
+                return setTupleValue(new Tuple21(), row);
+            case 22:
+                return setTupleValue(new Tuple22(), row);
+            case 23:
+                return setTupleValue(new Tuple23(), row);
+            case 24:
+                return setTupleValue(new Tuple24(), row);
+            case 25:
+                return setTupleValue(new Tuple25(), row);
+            default:
+                throw new IllegalArgumentException("Too long row: " + row.length + ", unable to convert to Tuple");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T extends Tuple> T setTupleValue(Tuple tuple, Object[] row) {
+        if (row.length != tuple.getArity()) {
+            throw new IllegalArgumentException("Row length" + row.length + " is not equal with tuple's arity: " + tuple.getArity());
+        }
+        for (int i = 0; i < row.length; i++) {
+            tuple.setField(row[i], i);
+        }
+        return (T) tuple;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
new file mode 100644
index 0000000..22405c9
--- /dev/null
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi Type Utils for conversion between Java Type, Siddhi Field Type, Stream Definition, and Flink Type Information.
+ */
+public class SiddhiTypeFactory {
+    private static final Map<Class<?>, Attribute.Type> JAVA_TO_SIDDHI_TYPE = new HashMap<>();
+    private static final Map<Attribute.Type, Class<?>> SIDDHI_TO_JAVA_TYPE = new HashMap<>();
+
+    static {
+        registerType(String.class, Attribute.Type.STRING);
+        registerType(Integer.class, Attribute.Type.INT);
+        registerType(int.class, Attribute.Type.INT);
+        registerType(Long.class, Attribute.Type.LONG);
+        registerType(long.class, Attribute.Type.LONG);
+        registerType(Float.class, Attribute.Type.FLOAT);
+        registerType(float.class, Attribute.Type.FLOAT);
+        registerType(Double.class, Attribute.Type.DOUBLE);
+        registerType(double.class, Attribute.Type.DOUBLE);
+        registerType(Boolean.class, Attribute.Type.BOOL);
+        registerType(boolean.class, Attribute.Type.BOOL);
+    }
+
+    public static void registerType(Class<?> javaType, Attribute.Type siddhiType) {
+        if (JAVA_TO_SIDDHI_TYPE.containsKey(javaType)) {
+            throw new IllegalArgumentException("Java type: " + javaType + " or siddhi type: " + siddhiType + " were already registered");
+        }
+        JAVA_TO_SIDDHI_TYPE.put(javaType, siddhiType);
+        SIDDHI_TO_JAVA_TYPE.put(siddhiType, javaType);
+    }
+
+    public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) {
+        SiddhiManager siddhiManager = null;
+        SiddhiAppRuntime runtime = null;
+        try {
+            siddhiManager = new SiddhiManager();
+            runtime = siddhiManager.createSiddhiAppRuntime(executionPlan);
+            Map<String, StreamDefinition> definitionMap = runtime.getStreamDefinitionMap();
+            if (definitionMap.containsKey(streamId)) {
+                return definitionMap.get(streamId);
+            } else {
+                throw new IllegalArgumentException("Unknown stream id" + streamId);
+            }
+        } finally {
+            if (runtime != null) {
+                runtime.shutdown();
+            }
+            if (siddhiManager != null) {
+                siddhiManager.shutdown();
+            }
+        }
+    }
+
+    public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(AbstractDefinition definition) {
+        int tupleSize = definition.getAttributeList().size();
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("Tuple").append(tupleSize);
+        stringBuilder.append("<");
+        List<String> attributeTypes = new ArrayList<>();
+        for (Attribute attribute : definition.getAttributeList()) {
+            attributeTypes.add(getJavaType(attribute.getType()).getName());
+        }
+        stringBuilder.append(StringUtils.join(attributeTypes, ","));
+        stringBuilder.append(">");
+        try {
+            return TypeInfoParser.parse(stringBuilder.toString());
+        } catch (IllegalArgumentException ex) {
+            throw new IllegalArgumentException("Unable to parse " + stringBuilder.toString(), ex);
+        }
+    }
+
+    public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(String executionPlan, String streamId) {
+        return getTupleTypeInformation(getStreamDefinition(executionPlan, streamId));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static final TypeInformation<?> MAP_TYPE_INFORMATION = TypeExtractor.createTypeInfo(new HashMap<String, Object>().getClass());
+
+    public static TypeInformation<Map<String, Object>> getMapTypeInformation() {
+        return (TypeInformation<Map<String, Object>>) MAP_TYPE_INFORMATION;
+    }
+
+    public static <F> Attribute.Type getAttributeType(TypeInformation<F> fieldType) {
+        if (JAVA_TO_SIDDHI_TYPE.containsKey(fieldType.getTypeClass())) {
+            return JAVA_TO_SIDDHI_TYPE.get(fieldType.getTypeClass());
+        } else {
+            return Attribute.Type.OBJECT
+                ;
+        }
+    }
+
+    public static Class<?> getJavaType(Attribute.Type attributeType) {
+        if (!SIDDHI_TO_JAVA_TYPE.containsKey(attributeType)) {
+            throw new IllegalArgumentException("Unable to get java type for siddhi attribute type: " + attributeType);
+        }
+        return SIDDHI_TO_JAVA_TYPE.get(attributeType);
+    }
+
+    public static <T> TypeInformation<Tuple2<String, T>> getStreamTupleTypeInformation(TypeInformation<T> typeInformation) {
+        return TypeInfoParser.parse("Tuple2<String," + typeInformation.getTypeClass().getName() + ">");
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
new file mode 100755
index 0000000..5c16c71
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.apache.flink.streaming.siddhi.source.RandomEventSource;
+import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
+import org.apache.flink.streaming.siddhi.source.RandomWordSource;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Flink-siddhi library integration test cases
+ */
+public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable {
+
+    @Rule
+    public transient TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Test
+    public void testSimpleWriteAndRead() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.fromElements(
+            Event.of(1, "start", 1.0),
+            Event.of(2, "middle", 2.0),
+            Event.of(3, "end", 3.0),
+            Event.of(4, "start", 4.0),
+            Event.of(5, "middle", 5.0),
+            Event.of(6, "end", 6.0)
+        );
+
+        String path = tempFolder.newFile().toURI().toString();
+        input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
+            @Override
+            public Event map(Event event) throws Exception {
+                return event;
+            }
+        })).writeAsText(path);
+        env.execute();
+        Assert.assertEquals(6, getLineCount(path));
+    }
+
+    @Test
+    public void testSimplePojoStreamAndReturnPojo() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.fromElements(
+            Event.of(1, "start", 1.0),
+            Event.of(2, "middle", 2.0),
+            Event.of(3, "end", 3.0),
+            Event.of(4, "start", 4.0),
+            Event.of(5, "middle", 5.0),
+            Event.of(6, "end", 6.0)
+        );
+
+        DataStream<Event> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price")
+            .cql("from inputStream insert into  outputStream")
+            .returns("outputStream", Event.class);
+        String path = tempFolder.newFile().toURI().toString();
+        output.print();
+        env.execute();
+    }
+
+    @Test
+    public void testUnboundedPojoSourceAndReturnTuple() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+        DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returns("outputStream");
+
+        DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() {
+            @Override
+            public Integer map(Tuple4<Long, Integer, String, Double> value) throws Exception {
+                return value.f1;
+            }
+        });
+        String resultPath = tempFolder.newFile().toURI().toString();
+        following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test
+    public void testUnboundedTupleSourceAndReturnTuple() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Tuple4<Integer, String, Double, Long>> input = env
+            .addSource(new RandomTupleSource(5).closeDelay(1500)).keyBy(1);
+
+        DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returns("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test
+    public void testUnboundedPrimitiveTypeSourceAndReturnTuple() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<String> input = env.addSource(new RandomWordSource(5).closeDelay(1500));
+
+        DataStream<Tuple1<String>> output = SiddhiCEP
+            .define("wordStream", input, "words")
+            .cql("from wordStream select words insert into  outputStream")
+            .returns("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test(expected = InvalidTypesException.class)
+    public void testUnboundedPojoSourceButReturnInvalidTupleType() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.addSource(new RandomEventSource(5).closeDelay(1500));
+
+        DataStream<Tuple5<Long, Integer, String, Double, Long>> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returns("outputStream");
+
+        DataStream<Long> following = output.map(new MapFunction<Tuple5<Long, Integer, String, Double, Long>, Long>() {
+            @Override
+            public Long map(Tuple5<Long, Integer, String, Double, Long> value) throws Exception {
+                return value.f0;
+            }
+        });
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+        env.execute();
+    }
+
+    @Test
+    public void testUnboundedPojoStreamAndReturnMap() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+        DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+        DataStream<Map<String, Object>> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returnAsMap("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test
+    public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.addSource(new RandomEventSource(5));
+        input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
+            @Override
+            public long extractAscendingTimestamp(Event element) {
+                return element.getTimestamp();
+            }
+        });
+
+        DataStream<Event> output = SiddhiCEP
+            .define("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
+            .returns("outputStream", Event.class);
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+
+    @Test
+    public void testMultipleUnboundedPojoStreamSimpleUnion() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(2), "input1");
+        DataStream<Event> input2 = env.addSource(new RandomEventSource(2), "input2");
+        DataStream<Event> input3 = env.addSource(new RandomEventSource(2), "input2");
+        DataStream<Event> output = SiddhiCEP
+            .define("inputStream1", input1, "id", "name", "price", "timestamp")
+            .union("inputStream2", input2, "id", "name", "price", "timestamp")
+            .union("inputStream3", input3, "id", "name", "price", "timestamp")
+            .cql(
+                "from inputStream1 select timestamp, id, name, price insert into outputStream;"
+                    + "from inputStream2 select timestamp, id, name, price insert into outputStream;"
+                    + "from inputStream3 select timestamp, id, name, price insert into outputStream;"
+            )
+            .returns("outputStream", Event.class);
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(6, getLineCount(resultPath));
+    }
+
+    /**
+     * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Joins</a>
+     */
+    @Test
+    public void testMultipleUnboundedPojoStreamUnionAndJoinWithWindow() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+        DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
+
+        DataStream<? extends Map> output = SiddhiCEP
+            .define("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp")
+            .union("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp")
+            .cql(
+                "from inputStream1#window.length(5) as s1 "
+                    + "join inputStream2#window.time(500) as s2 "
+                    + "on s1.id == s2.id "
+                    + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+                    + "insert into JoinStream;"
+            )
+            .returnAsMap("JoinStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    /**
+     * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Patterns</a>
+     */
+    @Test
+    public void testUnboundedPojoStreamSimplePatternMatch() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1");
+        DataStream<Event> input2 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input2");
+
+        DataStream<Map<String, Object>> output = SiddhiCEP
+            .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
+            .union("inputStream2", input2.keyBy("name"), "id", "name", "price", "timestamp")
+            .cql(
+                "from every s1 = inputStream1[id == 2] "
+                    + " -> s2 = inputStream2[id == 3] "
+                    + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 "
+                    + "insert into outputStream"
+            )
+            .returnAsMap("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(1, getLineCount(resultPath));
+        compareResultsByLinesInMemory("{id_1=2, name_1=test_event, id_2=3, name_2=test_event}", resultPath);
+    }
+
+    /**
+     * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Sequences</a>
+     */
+    @Test
+    public void testUnboundedPojoStreamSimpleSequences() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1");
+        DataStream<Map<String, Object>> output = SiddhiCEP
+            .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp")
+            .union("inputStream2", input1.keyBy("name"), "id", "name", "price", "timestamp")
+            .cql(
+                "from every s1 = inputStream1[id == 2]+ , "
+                    + "s2 = inputStream2[id == 3]? "
+                    + "within 1000 second "
+                    + "select s1[0].name as n1, s2.name as n2 "
+                    + "insert into outputStream"
+            )
+            .returnAsMap("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(1, getLineCount(resultPath));
+    }
+
+    private static int getLineCount(String resPath) throws IOException {
+        List<String> result = new LinkedList<>();
+        readAllResultLines(result, resPath);
+        return result.size();
+    }
+
+    @Test
+    public void testCustomizeSiddhiFunctionExtension() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input = env.addSource(new RandomEventSource(5));
+
+        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+        cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
+
+        DataStream<Map<String, Object>> output = cep
+            .from("inputStream", input, "id", "name", "price", "timestamp")
+            .cql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into  outputStream")
+            .returnAsMap("outputStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test
+    public void testRegisterStreamAndExtensionWithSiddhiCEPEnvironment() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+        DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
+
+        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+        cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
+
+        cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp");
+        cep.registerStream("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp");
+
+        DataStream<Tuple4<Long, String, Double, Double>> output = cep
+            .from("inputStream1").union("inputStream2")
+            .cql(
+                "from inputStream1#window.length(5) as s1 "
+                    + "join inputStream2#window.time(500) as s2 "
+                    + "on s1.id == s2.id "
+                    + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+                    + "insert into JoinStream;"
+            )
+            .returns("JoinStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+        assertEquals(5, getLineCount(resultPath));
+    }
+
+    @Test(expected = UndefinedStreamException.class)
+    public void testTriggerUndefinedStreamException() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
+
+        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
+        cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp");
+
+        DataStream<Map<String, Object>> output = cep
+            .from("inputStream1").union("inputStream2")
+            .cql(
+                "from inputStream1#window.length(5) as s1 "
+                    + "join inputStream2#window.time(500) as s2 "
+                    + "on s1.id == s2.id "
+                    + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 "
+                    + "insert into JoinStream;"
+            )
+            .returnAsMap("JoinStream");
+
+        String resultPath = tempFolder.newFile().toURI().toString();
+        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+        env.execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
new file mode 100644
index 0000000..582f1cd
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.extension;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.wso2.siddhi.core.config.SiddhiAppContext;
+import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.core.util.config.ConfigReader;
+import org.wso2.siddhi.query.api.definition.Attribute;
+
+public class CustomPlusFunctionExtension extends FunctionExecutor {
+    private Attribute.Type returnType;
+
+    /**
+     * The initialization method for FunctionExecutor, this method will be called before the other methods
+     */
+    @Override
+    protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
+        for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) {
+            Attribute.Type attributeType = expressionExecutor.getReturnType();
+            if (attributeType == Attribute.Type.DOUBLE) {
+                returnType = attributeType;
+
+            } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) {
+                throw new SiddhiAppCreationException("Plus cannot have parameters with types String or Bool");
+            } else {
+                returnType = Attribute.Type.LONG;
+            }
+        }
+    }
+
+    /**
+     * The main execution method which will be called upon event arrival
+     * when there are more then one function parameter
+     *
+     * @param data the runtime values of function parameters
+     * @return the function result
+     */
+    @Override
+    protected Object execute(Object[] data) {
+        if (returnType == Attribute.Type.DOUBLE) {
+            double total = 0;
+            for (Object aObj : data) {
+                total += Double.parseDouble(String.valueOf(aObj));
+            }
+
+            return total;
+        } else {
+            long total = 0;
+            for (Object aObj : data) {
+                total += Long.parseLong(String.valueOf(aObj));
+            }
+            return total;
+        }
+    }
+
+    /**
+     * The main execution method which will be called upon event arrival
+     * when there are zero or one function parameter
+     *
+     * @param data null if the function parameter count is zero or
+     *             runtime data value of the function parameter
+     * @return the function result
+     */
+    @Override
+    protected Object execute(Object data) {
+        if (returnType == Attribute.Type.DOUBLE) {
+            return Double.parseDouble(String.valueOf(data));
+        } else {
+            return Long.parseLong(String.valueOf(data));
+        }
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return returnType;
+    }
+
+    @Override
+    public Map<String, Object> currentState() {
+        return new HashMap<>();
+    }
+
+    @Override
+    public void restoreState(Map<String, Object> map) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
new file mode 100644
index 0000000..d271c89
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.operator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.wso2.siddhi.core.SiddhiAppRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SiddhiSyntaxTest {
+
+    private SiddhiManager siddhiManager;
+
+    @Before
+    public void setUp() {
+        siddhiManager = new SiddhiManager();
+    }
+
+    @After
+    public void after() {
+        siddhiManager = new SiddhiManager();
+    }
+
+    @Test
+    public void testSimplePlan() throws InterruptedException {
+        SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime(
+            "define stream inStream (name string, value double);"
+                + "from inStream insert into outStream");
+        runtime.start();
+
+        final List<Object[]> received = new ArrayList<>(3);
+        InputHandler inputHandler = runtime.getInputHandler("inStream");
+        Assert.assertNotNull(inputHandler);
+
+        try {
+            runtime.getInputHandler("unknownStream");
+            Assert.fail("Should throw exception for getting input handler for unknown streamId.");
+        } catch (Exception ex) {
+            // Expected exception for getting input handler for illegal streamId.
+        }
+
+        runtime.addCallback("outStream", new StreamCallback() {
+            @Override
+            public void receive(Event[] events) {
+                for (Event event : events) {
+                    received.add(event.getData());
+                }
+            }
+        });
+
+        inputHandler.send(new Object[]{"a", 1.1});
+        inputHandler.send(new Object[]{"b", 1.2});
+        inputHandler.send(new Object[]{"c", 1.3});
+        Thread.sleep(100);
+        Assert.assertEquals(3, received.size());
+        Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1});
+        Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2});
+        Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3});
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
new file mode 100644
index 0000000..db05e9d
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Test;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.StreamDefinition;
+
+import static org.junit.Assert.*;
+
+public class SiddhiExecutionPlanSchemaTest {
+    @Test
+    public void testStreamSchemaWithPojo() {
+        TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+        assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+
+        SiddhiStreamSchema<Event> schema = new SiddhiStreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(4, schema.getFieldIndexes().length);
+
+        StreamDefinition streamDefinition = schema.getStreamDefinition("test_stream");
+        assertArrayEquals(new String[]{"id", "timestamp", "name", "price"}, streamDefinition.getAttributeNameArray());
+
+        assertEquals(Attribute.Type.INT, streamDefinition.getAttributeType("id"));
+        assertEquals(Attribute.Type.LONG, streamDefinition.getAttributeType("timestamp"));
+        assertEquals(Attribute.Type.STRING, streamDefinition.getAttributeType("name"));
+        assertEquals(Attribute.Type.DOUBLE, streamDefinition.getAttributeType("price"));
+
+        assertEquals("define stream test_stream (id int,timestamp long,name string,price double);", schema.getStreamDefinitionExpression("test_stream"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
new file mode 100644
index 0000000..f876b2b
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StreamSchemaTest {
+    @Test
+    public void testStreamSchemaWithPojo() {
+        TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+        assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+        StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(4, schema.getFieldIndexes().length);
+        assertEquals(Event.class, schema.getTypeInfo().getTypeClass());
+    }
+
+    @Test
+    public void testStreamSchemaWithTuple() {
+        TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
+        StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+        assertEquals(4, schema.getFieldIndexes().length);
+        assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+    }
+
+    @Test
+    public void testStreamSchemaWithPrimitive() {
+        TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
+        StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
+        assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+        assertEquals(1, schema.getFieldIndexes().length);
+        assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testStreamSchemaWithPojoAndUnknownField() {
+        TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+        new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price", "unknown");
+    }
+
+    @Test
+    public void testStreamTupleSerializerWithPojo() {
+        TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
+        assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo);
+        StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(Event.class, schema.getTypeInfo().getTypeClass());
+
+        TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+        assertEquals("Java Tuple2<String, GenericType<" + Event.class.getName() + ">>", tuple2TypeInformation.toString());
+    }
+
+    @Test
+    public void testStreamTupleSerializerWithTuple() {
+        TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
+        StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+        assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
+        TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+        assertEquals("Java Tuple2<String, GenericType<" + Tuple4.class.getName() + ">>", tuple2TypeInformation.toString());
+    }
+
+    @Test
+    public void testStreamTupleSerializerWithPrimitive() {
+        TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
+        StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
+        assertEquals(String.class, schema.getTypeInfo().getTypeClass());
+        TypeInformation<Tuple2<String, String>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
+        assertEquals("Java Tuple2<String, String>", tuple2TypeInformation.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java
new file mode 100644
index 0000000..190208c
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.schema;
+
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamSerializerTest {
+    private static final long CURRENT = System.currentTimeMillis();
+
+    @Test
+    public void testSimplePojoRead() {
+        Event event = new Event();
+        event.setId(1);
+        event.setName("test");
+        event.setPrice(56.7);
+        event.setTimestamp(CURRENT);
+
+        StreamSchema<Event> schema = new StreamSchema<>(TypeExtractor.createTypeInfo(Event.class), "id", "name", "price", "timestamp");
+        StreamSerializer<Event> reader = new StreamSerializer<>(schema);
+        Assert.assertArrayEquals(new Object[]{1, "test", 56.7, CURRENT}, reader.getRow(event));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
new file mode 100644
index 0000000..357e1d2
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.Objects;
+
+public class Event {
+    private long timestamp;
+    private String name;
+    private double price;
+    private int id;
+
+    public double getPrice() {
+        return price;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "Event(" + id + ", " + name + ", " + price + ", " + timestamp + ")";
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof Event) {
+            Event other = (Event) obj;
+
+            return name.equals(other.name) && price == other.price && id == other.id && timestamp == other.timestamp;
+        } else {
+            return false;
+        }
+    }
+
+    public static Event of(int id, String name, double price) {
+        Event event = new Event();
+        event.setId(id);
+        event.setName(name);
+        event.setPrice(price);
+        event.setTimestamp(System.currentTimeMillis());
+        return event;
+    }
+
+    public static Event of(int id, String name, double price, long timestamp) {
+        Event event = new Event();
+        event.setId(id);
+        event.setName(name);
+        event.setPrice(price);
+        event.setTimestamp(timestamp);
+        return event;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, price, id);
+    }
+
+    public static TypeSerializer<Event> createTypeSerializer() {
+        TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+
+        return typeInformation.createSerializer(new ExecutionConfig());
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public void setPrice(double price) {
+        this.price = price;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
new file mode 100644
index 0000000..bb95fdd
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomEventSource implements SourceFunction<Event> {
+    private final int count;
+    private final Random random;
+    private final long initialTimestamp;
+
+    private volatile boolean isRunning = true;
+    private volatile int number = 0;
+    private volatile long closeDelayTimestamp = 1000;
+
+    public RandomEventSource(int count, long initialTimestamp) {
+        this.count = count;
+        this.random = new Random();
+        this.initialTimestamp = initialTimestamp;
+    }
+
+    public RandomEventSource() {
+        this(Integer.MAX_VALUE, System.currentTimeMillis());
+    }
+
+    public RandomEventSource(int count) {
+        this(count, System.currentTimeMillis());
+    }
+
+    public RandomEventSource closeDelay(long delayTimestamp) {
+        this.closeDelayTimestamp = delayTimestamp;
+        return this;
+    }
+
+    @Override
+    public void run(SourceContext<Event> ctx) throws Exception {
+        while (isRunning) {
+            ctx.collect(Event.of(number, "test_event", random.nextDouble(), initialTimestamp + 1000 * number));
+            number++;
+            if (number >= this.count) {
+                cancel();
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        this.isRunning = false;
+        try {
+            Thread.sleep(closeDelayTimestamp);
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
new file mode 100644
index 0000000..35121f7
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomTupleSource implements SourceFunction<Tuple4<Integer, String, Double, Long>> {
+    private final int count;
+    private final Random random;
+    private final long initialTimestamp;
+
+    private volatile boolean isRunning = true;
+    private volatile int number = 0;
+    private long closeDelayTimestamp;
+
+    public RandomTupleSource(int count, long initialTimestamp) {
+        this.count = count;
+        this.random = new Random();
+        this.initialTimestamp = initialTimestamp;
+    }
+
+    public RandomTupleSource() {
+        this(Integer.MAX_VALUE, System.currentTimeMillis());
+    }
+
+    public RandomTupleSource(int count) {
+        this(count, System.currentTimeMillis());
+    }
+
+
+    public RandomTupleSource closeDelay(long delayTimestamp) {
+        this.closeDelayTimestamp = delayTimestamp;
+        return this;
+    }
+
+    @Override
+    public void run(SourceContext<Tuple4<Integer, String, Double, Long>> ctx) throws Exception {
+        while (isRunning) {
+            ctx.collect(Tuple4.of(number, "test_tuple", random.nextDouble(), initialTimestamp + 1000 * number));
+            number++;
+            if (number >= this.count) {
+                cancel();
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        this.isRunning = false;
+        try {
+            Thread.sleep(this.closeDelayTimestamp);
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
new file mode 100644
index 0000000..19d904f
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.source;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Random;
+
+public class RandomWordSource implements SourceFunction<String> {
+    private static final String[] WORDS = new String[] {
+        "To be, or not to be,--that is the question:--",
+        "Whether 'tis nobler in the mind to suffer",
+        "The slings and arrows of outrageous fortune",
+        "Or to take arms against a sea of troubles,",
+        "And by opposing end them?--To die,--to sleep,--",
+        "No more; and by a sleep to say we end",
+        "The heartache, and the thousand natural shocks",
+        "That flesh is heir to,--'tis a consummation",
+        "Devoutly to be wish'd. To die,--to sleep;--",
+        "To sleep! perchance to dream:--ay, there's the rub;",
+        "For in that sleep of death what dreams may come,",
+        "When we have shuffled off this mortal coil,",
+        "Must give us pause: there's the respect",
+        "That makes calamity of so long life;",
+        "For who would bear the whips and scorns of time,",
+        "The oppressor's wrong, the proud man's contumely,",
+        "The pangs of despis'd love, the law's delay,",
+        "The insolence of office, and the spurns",
+        "That patient merit of the unworthy takes,",
+        "When he himself might his quietus make",
+        "With a bare bodkin? who would these fardels bear,",
+        "To grunt and sweat under a weary life,",
+        "But that the dread of something after death,--",
+        "The undiscover'd country, from whose bourn",
+        "No traveller returns,--puzzles the will,",
+        "And makes us rather bear those ills we have",
+        "Than fly to others that we know not of?",
+        "Thus conscience does make cowards of us all;",
+        "And thus the native hue of resolution",
+        "Is sicklied o'er with the pale cast of thought;",
+        "And enterprises of great pith and moment,",
+        "With this regard, their currents turn awry,",
+        "And lose the name of action.--Soft you now!",
+        "The fair Ophelia!--Nymph, in thy orisons",
+        "Be all my sins remember'd."
+    };
+
+    private final int count;
+    private final Random random;
+    private final long initialTimestamp;
+
+    private volatile boolean isRunning = true;
+    private volatile int number = 0;
+    private long closeDelayTimestamp;
+
+    public RandomWordSource(int count, long initialTimestamp) {
+        this.count = count;
+        this.random = new Random();
+        this.initialTimestamp = initialTimestamp;
+    }
+
+    public RandomWordSource() {
+        this(Integer.MAX_VALUE, System.currentTimeMillis());
+    }
+
+    public RandomWordSource(int count) {
+        this(count, System.currentTimeMillis());
+    }
+
+
+    public RandomWordSource closeDelay(long delayTimestamp) {
+        this.closeDelayTimestamp = delayTimestamp;
+        return this;
+    }
+
+    @Override
+    public void run(SourceContext<String> ctx) throws Exception {
+        while (isRunning) {
+            ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], initialTimestamp + 1000 * number);
+            number++;
+            if (number >= this.count) {
+                cancel();
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        this.isRunning = false;
+        try {
+            Thread.sleep(this.closeDelayTimestamp);
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
new file mode 100644
index 0000000..4753a3f
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class SiddhiTupleFactoryTest {
+    @Test
+    public void testConvertObjectArrayToTuple() {
+        Object[] row = new Object[]{1, "message", 1234567L, true, new Object()};
+        Tuple5 tuple5 = SiddhiTupleFactory.newTuple(row);
+        assertEquals(5, tuple5.getArity());
+        assertArrayEquals(row, new Object[]{
+            tuple5.f0,
+            tuple5.f1,
+            tuple5.f2,
+            tuple5.f3,
+            tuple5.f4
+        });
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConvertTooLongObjectArrayToTuple() {
+        Object[] row = new Object[26];
+        SiddhiTupleFactory.newTuple(row);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
new file mode 100644
index 0000000..c4a1e8c
--- /dev/null
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SiddhiTypeFactoryTest {
+    @Test
+    public void testTypeInfoParser() {
+        TypeInformation<Tuple3<String, Long, Object>> type1 = TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>");
+        Assert.assertNotNull(type1);
+        TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">");
+        Assert.assertNotNull(type2);
+    }
+
+    public static class InnerPojo {
+    }
+
+    @Test
+    public void testBuildTypeInformationForSiddhiStream() {
+        String query = "define stream inputStream (timestamp long, name string, value double);"
+            + "from inputStream select name, value insert into outputStream;";
+        TypeInformation<Tuple3<Long, String, Double>> inputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "inputStream");
+        TypeInformation<Tuple2<String, Double>> outputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "outputStream");
+
+        Assert.assertNotNull(inputStreamType);
+        Assert.assertNotNull(outputStreamType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/resources/log4j-test.properties b/flink-library-siddhi/src/test/resources/log4j-test.properties
new file mode 100755
index 0000000..5b1e4ed
--- /dev/null
+++ b/flink-library-siddhi/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-library-siddhi/src/test/resources/logback-test.xml b/flink-library-siddhi/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..b7a5793
--- /dev/null
+++ b/flink-library-siddhi/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+		</encoder>
+	</appender>
+
+	<root level="WARN">
+		<appender-ref ref="STDOUT"/>
+	</root>
+
+	<logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+	<logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+	<logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+	<logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>