You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/10/20 20:47:14 UTC

[incubator-streampipes] 03/03: [STREAMPIPES-250] Refactor Siddhi Wrapper

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 8c5beb86f5c5e5cf36d39476b7e042674bf2d296
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue Oct 20 22:40:39 2020 +0200

    [STREAMPIPES-250] Refactor Siddhi Wrapper
---
 streampipes-wrapper-siddhi/pom.xml                 |  6 ++
 .../wrapper/siddhi/constants/SiddhiConstants.java  |  2 +
 .../wrapper/siddhi/engine/SiddhiEngine.java        | 23 ++++---
 .../wrapper/siddhi/engine/SiddhiEventEngine.java   | 55 ++--------------
 .../siddhi/engine/SiddhiInvocationConfig.java      | 74 ----------------------
 .../siddhi/engine/SiddhiStatementGenerator.java    |  7 +-
 .../siddhi/engine/StreamPipesSiddhiProcessor.java  |  5 +-
 .../callback/SiddhiOutputStreamCallback.java       | 17 +++--
 .../engine/generator/EventTypeGenerator.java       |  8 ++-
 .../engine/generator/SiddhiAppGenerator.java       | 22 +++----
 .../generator/SiddhiInvocationConfigGenerator.java | 65 +++++++++++++++++++
 .../wrapper/siddhi/model/EventType.java            | 27 ++++----
 .../SiddhiProcessorParams.java}                    | 72 ++++++++-------------
 .../AbstractQueryGenerator.java}                   | 19 ++++--
 .../FromClause.java}                               | 13 +---
 .../SelectClause.java}                             | 16 +++--
 .../EventType.java => query/SiddhiStatement.java}  | 29 ++++-----
 .../expression/Expression.java}                    | 17 +++--
 .../expression/Expressions.java}                   | 18 +++---
 .../expression/PropertyExpression.java}            | 13 +---
 .../wrapper/siddhi/utils/SiddhiUtils.java          | 25 +++++---
 21 files changed, 237 insertions(+), 296 deletions(-)

diff --git a/streampipes-wrapper-siddhi/pom.xml b/streampipes-wrapper-siddhi/pom.xml
index 1a8ce3a..fdbad74 100644
--- a/streampipes-wrapper-siddhi/pom.xml
+++ b/streampipes-wrapper-siddhi/pom.xml
@@ -52,6 +52,12 @@
             <groupId>io.siddhi</groupId>
             <artifactId>siddhi-query-compiler</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.siddhi.extension.execution.list</groupId>
+            <artifactId>siddhi-execution-list</artifactId>
+            <version>1.0.1</version>
+        </dependency>
+
     </dependencies>
 
     <repositories>
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
index 3d2a2a3..146be5a 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
@@ -21,6 +21,8 @@ public class SiddhiConstants {
 
   public static final String SELECT = "select";
   public static final String INSERT = "insert";
+  public static final String WHITESPACE = " ";
+  public static final String ASTERISK = "*";
 
   public static final String FIRST_STREAM_PREFIX = "s0";
   public static final String SECOND_STREAM_PREFIX = "s1";
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
index 173f30f..dcc7eed 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
@@ -21,14 +21,17 @@ import io.siddhi.core.SiddhiAppRuntime;
 import io.siddhi.core.SiddhiManager;
 import io.siddhi.core.stream.input.InputHandler;
 import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiOutputStreamCallback;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiOutputStreamDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.model.EventType;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
 import org.apache.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,18 +63,18 @@ public class SiddhiEngine {
     this.debugMode = true;
   }
 
-  public void initializeEngine(SiddhiInvocationConfig<? extends EventProcessorBindingParams> settings,
+  public void initializeEngine(SiddhiInvocationConfigGenerator<? extends EventProcessorBindingParams> settings,
                                SpOutputCollector spOutputCollector,
                                EventProcessorRuntimeContext runtimeContext) {
 
-    EventProcessorBindingParams params = settings.getParams();
-    this.typeInfo = settings.getEventTypeInfo();
+    EventProcessorBindingParams params = settings.getSiddhiProcessorParams().getParams();
+    this.typeInfo = settings.getSiddhiProcessorParams().getEventTypeInfo();
     SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
 
     //this.timestampField = removeStreamIdFromTimestamp(setTimestamp(parameters));
 
     siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(settings.getSiddhiAppString());
-    settings.getParams()
+    settings.getSiddhiProcessorParams().getParams()
             .getInEventTypes()
             .forEach((key, value) -> {
               String preparedKey = SiddhiUtils.prepareName(key);
@@ -79,14 +82,16 @@ public class SiddhiEngine {
             });
 
     StreamCallback callback;
-
+    Map<String, StreamDefinition> streamDef = siddhiAppRuntime.getStreamDefinitionMap();
+    String outputKey = SiddhiUtils.getPreparedOutputTopicName(params);
+    List<Attribute> streamAttributes = streamDef.get(outputKey).getAttributeList();
     if (!debugMode) {
-      callback = new SiddhiOutputStreamCallback(spOutputCollector, settings.getOutputEventKeys(), runtimeContext);
+      callback = new SiddhiOutputStreamCallback(spOutputCollector, runtimeContext, streamAttributes);
     } else {
       callback = new SiddhiOutputStreamDebugCallback(debugCallback);
     }
 
-    siddhiAppRuntime.addCallback(SiddhiUtils.prepareName(SiddhiUtils.getOutputTopicName(params)), callback);
+    siddhiAppRuntime.addCallback(SiddhiUtils.getPreparedOutputTopicName(params), callback);
     siddhiAppRuntime.start();
   }
 
@@ -97,7 +102,7 @@ public class SiddhiEngine {
       List<String> eventKeys = this.typeInfo
               .get(sourceId)
               .stream()
-              .map(EventType::getEventTypeName)
+              .map(EventType::getFieldName)
               .collect(Collectors.toList());
 
       inputHandler.send(SiddhiUtils.toObjArr(eventKeys, event.getRaw()));
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
index 585ac5a..a6ce6e1 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
@@ -17,29 +17,22 @@
  */
 package org.apache.streampipes.wrapper.siddhi.engine;
 
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
-import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.InputStreamNameGenerator;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements
         EventProcessor<B>, SiddhiStatementGenerator<B> {
 
   private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);
 
   private SiddhiEngine siddhiEngine;
-  private SiddhiInvocationConfig<B> siddhiConfig;
-  private List<String> outputEventKeys = new ArrayList<>();
 
   public SiddhiEventEngine() {
     this.siddhiEngine = new SiddhiEngine();
@@ -51,12 +44,9 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
 
   @Override
   public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-    List<String> inputStreamNames = new InputStreamNameGenerator<>(parameters).generateInputStreamNames();
-    this.outputEventKeys = new ArrayList<>(parameters.getOutEventType().keySet());
-    String fromStatement = fromStatement(inputStreamNames, parameters);
-    String selectStatement = selectStatement(outputEventKeys, parameters);
-    this.siddhiConfig = new SiddhiInvocationConfig<>(parameters, fromStatement, selectStatement, inputStreamNames);
-    this.siddhiEngine.initializeEngine(this.siddhiConfig, spOutputCollector, runtimeContext);
+    SiddhiInvocationConfigGenerator<B> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
+            this::fromStatement, this::selectStatement);
+    this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
   }
 
   @Override
@@ -69,45 +59,8 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
     this.siddhiEngine.shutdownEngine();
   }
 
-  public SiddhiInvocationConfig<B> getSiddhiConfig() {
-    return this.siddhiConfig;
-  }
-
   public String prepareName(String name) {
     return SiddhiUtils.prepareName(name);
   }
 
-  public String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
-    StringBuilder selectString = new StringBuilder();
-    selectString.append(SiddhiConstants.SELECT).append(" ");
-
-    if (outputEventKeys.size() > 0) {
-      for (int i=0; i<outputEventKeys.size() - 1; i++) {
-        selectString
-                .append(SiddhiConstants.FIRST_STREAM_PREFIX)
-                .append(outputEventKeys.get(i))
-                .append(",");
-      }
-      selectString
-              .append(SiddhiConstants.FIRST_STREAM_PREFIX)
-              .append(outputEventKeys.get(outputEventKeys.size() - 1));
-    }
-    return selectString.toString();
-  }
-
-  public String getCustomOutputSelectStatement(DataProcessorInvocation invocation,
-                                               String eventName) {
-    StringBuilder selectString = new StringBuilder();
-    selectString.append(SiddhiConstants.SELECT).append(" ");
-
-    if (outputEventKeys.size() > 0) {
-      for (int i = 0; i < outputEventKeys.size() - 1; i++) {
-        selectString.append(eventName).append(".s0").append(outputEventKeys.get(i)).append(",");
-      }
-      selectString.append(eventName).append(".s0").append(outputEventKeys.get(outputEventKeys.size() - 1));
-
-    }
-
-    return selectString.toString();
-  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java
deleted file mode 100644
index 62eb29c..0000000
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.EventTypeGenerator;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiAppGenerator;
-import org.apache.streampipes.wrapper.siddhi.model.EventType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class SiddhiInvocationConfig<B extends EventProcessorBindingParams> {
-
-  private final B params;
-  private final String siddhiAppString;
-  private final List<String> inputStreamNames;
-  private final Map<String, List<EventType>> eventTypeInfo;
-  private final List<String> outputEventKeys;
-
-  public SiddhiInvocationConfig(B params,
-                                String fromStatement,
-                                String selectStatement,
-                                List<String> inputStreamNames) {
-    this.params = params;
-    this.inputStreamNames = inputStreamNames;
-    this.eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
-    this.siddhiAppString = new SiddhiAppGenerator<>(params, inputStreamNames, eventTypeInfo, fromStatement, selectStatement)
-            .generateSiddhiApp();
-    this.outputEventKeys = new ArrayList<>(this.params.getOutEventType().keySet());
-  }
-
-  public B getParams() {
-    return params;
-  }
-
-  public String getSiddhiAppString() {
-    return siddhiAppString;
-  }
-
-  public List<String> getInputStreamNames() {
-    return inputStreamNames;
-  }
-
-  public Map<String, List<EventType>> getEventTypeInfo() {
-    return eventTypeInfo;
-  }
-
-  public List<String> getOutputEventKeys() {
-    return this.outputEventKeys;
-  }
-
-  public String getSiddhiApp() {
-    return this.siddhiAppString;
-  }
-
-
-}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
index d7adcb9..9abc569 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
@@ -18,13 +18,12 @@
 package org.apache.streampipes.wrapper.siddhi.engine;
 
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
 
 public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
 
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+  String fromStatement(SiddhiProcessorParams<B> siddhiParams);
 
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+  String selectStatement(SiddhiProcessorParams<B> siddhiParams);
 
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
index d603e9a..2aabb4c 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
 import org.apache.streampipes.wrapper.standalone.ProcessorParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
@@ -39,7 +40,9 @@ public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcesso
 
   @Override
   public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
+    SiddhiInvocationConfigGenerator<ProcessorParams> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
+            this::fromStatement, this::selectStatement);
+    this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
   }
 
   @Override
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
index 947e99a..95dcf1e 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.wrapper.siddhi.engine.callback;
 
 import io.siddhi.core.event.Event;
 import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.Attribute;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
@@ -28,24 +29,26 @@ import java.util.List;
 public class SiddhiOutputStreamCallback extends StreamCallback {
 
   private SpOutputCollector collector;
-  private List<String> outputEventKeys;
   private EventProcessorRuntimeContext runtimeContext;
 
+  private List<Attribute> streamAttributes;
+
   public SiddhiOutputStreamCallback(SpOutputCollector collector,
-                                    List<String> outputEventKeys,
-                                    EventProcessorRuntimeContext runtimeContext) {
+                                    EventProcessorRuntimeContext runtimeContext,
+                                    List<Attribute> streamAttributes) {
     this.collector = collector;
-    this.outputEventKeys = outputEventKeys;
     this.runtimeContext = runtimeContext;
+    this.streamAttributes = streamAttributes;
   }
 
   @Override
   public void receive(Event[] events) {
     if (events.length > 0) {
       Event lastEvent = events[events.length - 1];
-      collector.collect(SiddhiUtils.toSpEvent(lastEvent, outputEventKeys,
-              runtimeContext.getOutputSchemaInfo
-                      (), runtimeContext.getOutputSourceInfo()));
+      collector.collect(SiddhiUtils.toSpEvent(lastEvent,
+              runtimeContext.getOutputSchemaInfo(),
+              runtimeContext.getOutputSourceInfo(),
+              streamAttributes));
     }
   }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
index 1aa95a4..7c1d28c 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
@@ -40,7 +40,7 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
       List<EventType> sortedEventKeys = new ArrayList<>();
       for (String propertyKey : value.keySet()) {
         sortedEventKeys.add(makeEventType(currentStreamIndex.get(), propertyKey, value.get(propertyKey)));
-        sortedEventKeys.sort(Comparator.comparing(EventType::getEventTypeName));
+        sortedEventKeys.sort(Comparator.comparing(EventType::getFieldName));
       }
       listOfEventKeys.put(key, sortedEventKeys);
       currentStreamIndex.getAndSet(currentStreamIndex.get() + 1);
@@ -50,7 +50,7 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
   }
 
   private EventType makeEventType(Integer currentStreamIndex, String propertyName, Object propertyType) {
-    return new EventType(currentStreamIndex, propertyName, toType((Class<?>) propertyType));
+    return new EventType(toSelectorPrefix(currentStreamIndex), propertyName, toType((Class<?>) propertyType));
   }
 
   private String toType(Class<?> o) {
@@ -70,4 +70,8 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
       return SiddhiConstants.SIDDHI_OBJECT_TYPE;
     }
   }
+
+  private String toSelectorPrefix(Integer currentStreamIndex) {
+    return currentStreamIndex == 0 ? SiddhiConstants.FIRST_STREAM_PREFIX : SiddhiConstants.SECOND_STREAM_PREFIX;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
index 0c54f16..b8fd175 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
@@ -18,45 +18,39 @@
 package org.apache.streampipes.wrapper.siddhi.engine.generator;
 
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
 import org.apache.streampipes.wrapper.siddhi.model.EventType;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
 import java.util.StringJoiner;
 
 public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
 
   private static final Logger LOG = LoggerFactory.getLogger(SiddhiAppGenerator.class);
 
-  private B params;
-  private List<String> inputStreamNames;
-  private Map<String, List<EventType>> eventTypes;
+  private final SiddhiProcessorParams<B> siddhiParams;
   private final String fromStatement;
   private final String selectStatement;
 
   private final StringBuilder siddhiAppString;
 
-  public SiddhiAppGenerator(B params,
-                            List<String> inputStreamNames,
-                            Map<String, List<EventType>> eventTypes,
+  public SiddhiAppGenerator(SiddhiProcessorParams<B> siddhiParams,
                             String fromStatement,
                             String selectStatement) {
-    this.params = params;
-    this.inputStreamNames = inputStreamNames;
-    this.eventTypes = eventTypes;
+    this.siddhiParams = siddhiParams;
     this.fromStatement = fromStatement;
     this.selectStatement = selectStatement;
     this.siddhiAppString = new StringBuilder();
   }
 
   public String generateSiddhiApp() {
-    LOG.info("Configuring event types for graph " + params.getGraph().getName());
+    LOG.info("Configuring event types for graph " + this.siddhiParams.getParams().getGraph().getName());
 
-    this.eventTypes.forEach(this::registerEventType);
-    registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(params));
+    this.siddhiParams.getEventTypeInfo().forEach(this::registerEventType);
+    registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(this.siddhiParams.getParams()));
 
     return this.siddhiAppString.toString();
   }
@@ -66,7 +60,7 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
     StringJoiner joiner = new StringJoiner(",");
 
     eventSchema.forEach(typeInfo -> {
-        joiner.add("s" + typeInfo.getStreamIdentifier() + typeInfo.getEventTypeName() + " " + typeInfo.getEventType());
+      joiner.add(typeInfo.getSelectorPrefix() + typeInfo.getFieldName() + " " + typeInfo.getFieldType());
     });
 
     this.siddhiAppString
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
new file mode 100644
index 0000000..9c25f15
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine.generator;
+
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class SiddhiInvocationConfigGenerator<B extends EventProcessorBindingParams> {
+
+  private final String siddhiAppString;
+  private final String fromStatement;
+  private final String selectStatement;
+
+  private final SiddhiProcessorParams<B> siddhiProcessorParams;
+
+  public SiddhiInvocationConfigGenerator(B params,
+                                         Function<SiddhiProcessorParams<B>, String> fromStatementFunction,
+                                         Function<SiddhiProcessorParams<B>, String> selectStatementFunction) {
+    List<String> inputStreamNames = new InputStreamNameGenerator<>(params).generateInputStreamNames();
+    Map<String, List<EventType>> eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
+    List<String> outputEventKeys = new ArrayList<>(params.getOutEventType().keySet());
+    this.siddhiProcessorParams = new SiddhiProcessorParams<>(params, inputStreamNames, eventTypeInfo, outputEventKeys);
+    this.fromStatement = fromStatementFunction.apply(this.siddhiProcessorParams);
+    this.selectStatement = selectStatementFunction.apply(this.siddhiProcessorParams);
+    this.siddhiAppString = new SiddhiAppGenerator<>(siddhiProcessorParams, fromStatement, selectStatement)
+            .generateSiddhiApp();
+  }
+
+  public String getSiddhiAppString() {
+    return siddhiAppString;
+  }
+
+  public SiddhiProcessorParams<B> getSiddhiProcessorParams() {
+    return siddhiProcessorParams;
+  }
+
+  public String getFromStatement() {
+    return fromStatement;
+  }
+
+  public String getSelectStatement() {
+    return selectStatement;
+  }
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
index 1264caf..d1e324f 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
@@ -19,26 +19,25 @@ package org.apache.streampipes.wrapper.siddhi.model;
 
 public class EventType {
 
-  private Integer streamIdentifier;
-  private String eventTypeName;
-  private String eventType;
+  private String selectorPrefix;
+  private String fieldName;
+  private String fieldType;
 
-  public EventType(Integer streamIdentifier, String eventTypeName, String eventType) {
-    this.streamIdentifier = streamIdentifier;
-    this.eventTypeName = eventTypeName;
-    this.eventType = eventType;
+  public EventType(String selectorPrefix, String fieldName, String fieldType) {
+    this.selectorPrefix = selectorPrefix;
+    this.fieldName = fieldName;
+    this.fieldType = fieldType;
   }
 
-  public Integer getStreamIdentifier() {
-    return streamIdentifier;
+  public String getSelectorPrefix() {
+    return selectorPrefix;
   }
 
-  public String getEventTypeName() {
-    return eventTypeName;
+  public String getFieldName() {
+    return fieldName;
   }
 
-  public String getEventType() {
-    return eventType;
+  public String getFieldType() {
+    return fieldType;
   }
-
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
similarity index 51%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
index 585ac5a..0a2ea57 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
@@ -15,66 +15,46 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.model;
 
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
 import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
-import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.InputStreamNameGenerator;
-import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.List;
-
-public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements
-        EventProcessor<B>, SiddhiStatementGenerator<B> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);
-
-  private SiddhiEngine siddhiEngine;
-  private SiddhiInvocationConfig<B> siddhiConfig;
-  private List<String> outputEventKeys = new ArrayList<>();
-
-  public SiddhiEventEngine() {
-    this.siddhiEngine = new SiddhiEngine();
-  }
-
-  public SiddhiEventEngine(SiddhiDebugCallback debugCallback) {
-    this.siddhiEngine = new SiddhiEngine(debugCallback);
-  }
-
-  @Override
-  public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-    List<String> inputStreamNames = new InputStreamNameGenerator<>(parameters).generateInputStreamNames();
-    this.outputEventKeys = new ArrayList<>(parameters.getOutEventType().keySet());
-    String fromStatement = fromStatement(inputStreamNames, parameters);
-    String selectStatement = selectStatement(outputEventKeys, parameters);
-    this.siddhiConfig = new SiddhiInvocationConfig<>(parameters, fromStatement, selectStatement, inputStreamNames);
-    this.siddhiEngine.initializeEngine(this.siddhiConfig, spOutputCollector, runtimeContext);
+import java.util.Map;
+
+public class SiddhiProcessorParams<B extends EventProcessorBindingParams> {
+
+  private final B params;
+  private final List<String> inputStreamNames;
+  private final Map<String, List<EventType>> eventTypeInfo;
+  private final List<String> outputEventKeys;
+
+  public SiddhiProcessorParams(B params,
+                               List<String> inputStreamNames,
+                               Map<String, List<EventType>> eventTypeInfo,
+                               List<String> outputEventKeys) {
+    this.params = params;
+    this.inputStreamNames = inputStreamNames;
+    this.eventTypeInfo = eventTypeInfo;
+    this.outputEventKeys = outputEventKeys;
   }
 
-  @Override
-  public void onEvent(org.apache.streampipes.model.runtime.Event event, SpOutputCollector collector) {
-    this.siddhiEngine.processEvent(event);
+  public B getParams() {
+    return params;
   }
 
-  @Override
-  public void onDetach() {
-    this.siddhiEngine.shutdownEngine();
+  public List<String> getInputStreamNames() {
+    return inputStreamNames;
   }
 
-  public SiddhiInvocationConfig<B> getSiddhiConfig() {
-    return this.siddhiConfig;
+  public Map<String, List<EventType>> getEventTypeInfo() {
+    return eventTypeInfo;
   }
 
-  public String prepareName(String name) {
-    return SiddhiUtils.prepareName(name);
+  public List<String> getOutputEventKeys() {
+    return outputEventKeys;
   }
 
   public String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
similarity index 61%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
index d7adcb9..c7fe3be 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query;
 
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
 
-import java.util.List;
+import java.util.Arrays;
+import java.util.StringJoiner;
 
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public abstract class AbstractQueryGenerator {
 
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+  public static String join(String... substrings) {
+    StringJoiner joiner = new StringJoiner(SiddhiConstants.WHITESPACE);
+    Arrays.stream(substrings).forEach(joiner::add);
 
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+    return joiner.toString();
+  }
 
+  public static String fromString(String query) {
+    return null;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
index d7adcb9..6667e89 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
@@ -15,16 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+package org.apache.streampipes.wrapper.siddhi.query;
 
+public class FromClause extends AbstractQueryGenerator {
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
similarity index 64%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
index d7adcb9..c126880 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
@@ -15,16 +15,20 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query;
 
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
+import org.apache.streampipes.wrapper.siddhi.query.expression.Expression;
 
 import java.util.List;
 
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public class SelectClause extends AbstractQueryGenerator {
 
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+  public static String createWildcard() {
+    return join(SiddhiConstants.SELECT, SiddhiConstants.ASTERISK);
+  }
 
+  public static String create(List<Expression> outputEventProperties) {
+    return null;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
similarity index 58%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
index 1264caf..44bfcf5 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
@@ -15,30 +15,27 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.model;
+package org.apache.streampipes.wrapper.siddhi.query;
 
-public class EventType {
+public class SiddhiStatement {
 
-  private Integer streamIdentifier;
-  private String eventTypeName;
-  private String eventType;
+  private FromClause fromClause;
+  private SelectClause selectClause;
 
-  public EventType(Integer streamIdentifier, String eventTypeName, String eventType) {
-    this.streamIdentifier = streamIdentifier;
-    this.eventTypeName = eventTypeName;
-    this.eventType = eventType;
+  private SiddhiStatement(FromClause fromClause, SelectClause selectClause) {
+    this.fromClause = fromClause;
+    this.selectClause = selectClause;
   }
 
-  public Integer getStreamIdentifier() {
-    return streamIdentifier;
+  public static SiddhiStatement from(FromClause fromClause, SelectClause selectClause) {
+    return new SiddhiStatement(fromClause, selectClause);
   }
 
-  public String getEventTypeName() {
-    return eventTypeName;
+  public FromClause getFromClause() {
+    return fromClause;
   }
 
-  public String getEventType() {
-    return eventType;
+  public SelectClause getSelectClause() {
+    return selectClause;
   }
-
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
similarity index 63%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
index d7adcb9..aa4d10d 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
@@ -15,16 +15,21 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query.expression;
 
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
 
-import java.util.List;
+import java.util.Arrays;
+import java.util.StringJoiner;
 
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public abstract class Expression {
 
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+  public String join(String... substrings) {
+    StringJoiner joiner = new StringJoiner(SiddhiConstants.WHITESPACE);
+    Arrays.stream(substrings).forEach(joiner::add);
 
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+    return joiner.toString();
+  }
 
+  public abstract String toSiddhiEpl();
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
index d7adcb9..cf600e1 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query.expression;
 
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+public class Expressions {
 
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+  public static Expression property(String propertyName) {
+    //return new PropertyExpression(propertyName);
+    return null;
+  }
 
+  public static Expression property(String propertyName, String targetName) {
+    return null;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
index d7adcb9..ceade46 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
@@ -15,16 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+package org.apache.streampipes.wrapper.siddhi.query.expression;
 
+public abstract class PropertyExpression extends Expression {
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
index 2292f18..b5bc847 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
@@ -18,10 +18,12 @@
 package org.apache.streampipes.wrapper.siddhi.utils;
 
 import io.siddhi.core.event.Event;
+import io.siddhi.query.api.definition.Attribute;
 import org.apache.streampipes.model.runtime.EventFactory;
 import org.apache.streampipes.model.runtime.SchemaInfo;
 import org.apache.streampipes.model.runtime.SourceInfo;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
 
 import java.util.HashMap;
 import java.util.List;
@@ -29,18 +31,19 @@ import java.util.Map;
 
 public class SiddhiUtils {
 
-  public static org.apache.streampipes.model.runtime.Event toSpEvent(Event event, List<String> outputEventKeys, SchemaInfo
-          schemaInfo, SourceInfo sourceInfo) {
+  public static org.apache.streampipes.model.runtime.Event toSpEvent(Event event,
+                                                                     SchemaInfo schemaInfo,
+                                                                     SourceInfo sourceInfo,
+                                                                     List<Attribute> streamAttributes) {
     Map<String, Object> outMap = new HashMap<>();
 
-    for (int i = 0; i < outputEventKeys.size(); i++) {
-      if (event.getData(i) instanceof List) {
-        List<Object> tmp = (List<Object>) event.getData(i);
-        outMap.put(outputEventKeys.get(i), tmp.get(0));
-      }
-      else {
-        outMap.put(outputEventKeys.get(i), event.getData(i));
+    for (int i = 0; i < streamAttributes.size(); i++) {
+      String outputKey = streamAttributes.get(i).getName();
+      if (outputKey.startsWith(SiddhiConstants.FIRST_STREAM_PREFIX) ||
+              outputKey.startsWith(SiddhiConstants.SECOND_STREAM_PREFIX)) {
+        outputKey = outputKey.substring(2);
       }
+        outMap.put(outputKey, event.getData(i));
     }
     return EventFactory.fromMap(outMap, sourceInfo, schemaInfo);
   }
@@ -54,6 +57,10 @@ public class SiddhiUtils {
     return result;
   }
 
+  public static String getPreparedOutputTopicName(EventProcessorBindingParams params) {
+    return prepareName(getOutputTopicName(params));
+  }
+
   public static String getOutputTopicName(EventProcessorBindingParams parameters) {
     return parameters
             .getGraph()