You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/10/20 20:47:11 UTC

[incubator-streampipes] branch dev updated (36c86b8 -> 8c5beb8)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 36c86b8  Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
     new 6cdeb31  [STREAMPIPES-250] Refactor Siddhi wrapper
     new 6e1c9ef  [STREAMPIPES-249] Add compact data sink
     new 8c5beb8  [STREAMPIPES-250] Refactor Siddhi Wrapper

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 streampipes-wrapper-siddhi/pom.xml                 |   6 +
 .../wrapper/siddhi/constants/SiddhiConstants.java  |  31 ++-
 .../wrapper/siddhi/engine/SiddhiEngine.java        | 127 ++++++++++
 .../wrapper/siddhi/engine/SiddhiEventEngine.java   | 274 ++-------------------
 .../siddhi/engine/SiddhiStatementGenerator.java    |  11 +-
 .../siddhi/engine/StreamPipesSiddhiProcessor.java  |  57 +++++
 .../engine/{ => callback}/SiddhiDebugCallback.java |   2 +-
 .../callback/SiddhiOutputStreamCallback.java       |  54 ++++
 .../SiddhiOutputStreamDebugCallback.java}          |  24 +-
 .../engine/generator/EventTypeGenerator.java       |  77 ++++++
 .../engine/generator/InputStreamNameGenerator.java |  31 ++-
 .../engine/generator/SiddhiAppGenerator.java       |  86 +++++++
 .../generator/SiddhiInvocationConfigGenerator.java |  65 +++++
 .../wrapper/siddhi/model/EventType.java            |  25 +-
 .../siddhi/model/SiddhiProcessorParams.java        |  93 +++++++
 .../siddhi/query/AbstractQueryGenerator.java       |  27 +-
 .../wrapper/siddhi/query/FromClause.java           |   6 +-
 .../wrapper/siddhi/query/SelectClause.java         |  26 +-
 .../wrapper/siddhi/query/SiddhiStatement.java      |  39 ++-
 .../siddhi/query/expression/Expression.java        |  27 +-
 .../siddhi/query/expression/Expressions.java       |  11 +-
 .../query/expression/PropertyExpression.java       |   6 +-
 .../wrapper/siddhi/utils/SiddhiUtils.java          |  80 ++++++
 .../streampipes/wrapper/standalone/SinkParams.java |  18 +-
 .../wrapper/standalone/StreamPipesDataSink.java    |  16 +-
 25 files changed, 827 insertions(+), 392 deletions(-)
 copy archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java (52%)
 create mode 100644 streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
 copy streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java (72%)
 create mode 100644 streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
 copy streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/{ => callback}/SiddhiDebugCallback.java (93%)
 create mode 100644 streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
 rename streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/{SiddhiDebugCallback.java => callback/SiddhiOutputStreamDebugCallback.java} (55%)
 create mode 100644 streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
 copy archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/InputStreamNameGenerator.java (55%)
 create mode 100644 streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
 create mode 100644 streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
 copy streampipes-model/src/main/java/org/apache/streampipes/model/runtime/SourceInfo.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java (71%)
 create mode 100644 streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
 copy streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Filetypes.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java (63%)
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java (88%)
 copy streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Filetypes.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java (64%)
 copy ui/src/app/editor/components/pipeline-element-icon-stand/pipeline-element-icon-stand.component.scss => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java (59%)
 copy streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Filetypes.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java (65%)
 copy streampipes-code-generation/src/main/java/org/apache/streampipes/codegeneration/api/IDeclareModelGenerator.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java (73%)
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSuitableSepasAvailableException.java => streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java (86%)
 create mode 100644 streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java


[incubator-streampipes] 02/03: [STREAMPIPES-249] Add compact data sink

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6e1c9ef5a06220f5f1cc5a8004b10b929a400f60
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Oct 18 16:03:37 2020 +0200

    [STREAMPIPES-249] Add compact data sink
---
 .../streampipes/wrapper/standalone/SinkParams.java     | 18 +++++++++++++++++-
 .../wrapper/standalone/StreamPipesDataSink.java        | 16 +++++++++++++++-
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/SinkParams.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/SinkParams.java
index b0d234e..31b317b 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/SinkParams.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/SinkParams.java
@@ -16,5 +16,21 @@
  *
  */
 package org.apache.streampipes.wrapper.standalone;
-public class SinkParams {
+
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
+
+public class SinkParams extends EventSinkBindingParams {
+
+  private DataSinkParameterExtractor extractor;
+
+  public SinkParams(DataSinkInvocation graph) {
+    super(graph);
+    this.extractor = DataSinkParameterExtractor.from(graph);
+  }
+
+  public DataSinkParameterExtractor extractor() {
+    return this.extractor;
+  }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
index 59b1241..b087b0b 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
@@ -16,5 +16,19 @@
  *
  */
 package org.apache.streampipes.wrapper.standalone;
-public class StreamPipesDataSink {
+
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.runtime.EventSink;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+
+import java.util.function.Supplier;
+
+public abstract class StreamPipesDataSink extends StandaloneEventSinkDeclarer<SinkParams> implements EventSink<SinkParams> {
+
+  @Override
+  public ConfiguredEventSink<SinkParams> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
+    Supplier<EventSink<SinkParams>> supplier = () -> this;
+    return new ConfiguredEventSink<>(new SinkParams(graph), supplier);
+  }
 }


[incubator-streampipes] 01/03: [STREAMPIPES-250] Refactor Siddhi wrapper

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6cdeb31bdc5e3f5862f7c96b9bba60ae2a6cf6e6
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Oct 18 16:03:08 2020 +0200

    [STREAMPIPES-250] Refactor Siddhi wrapper
---
 .../SiddhiConstants.java}                          |  16 +-
 .../wrapper/siddhi/engine/SiddhiEngine.java        | 122 +++++++++
 .../wrapper/siddhi/engine/SiddhiEventEngine.java   | 281 ++++-----------------
 .../siddhi/engine/SiddhiInvocationConfig.java      |  74 ++++++
 ...Callback.java => SiddhiStatementGenerator.java} |  10 +-
 .../siddhi/engine/StreamPipesSiddhiProcessor.java  |  54 ++++
 .../engine/{ => callback}/SiddhiDebugCallback.java |   2 +-
 .../callback/SiddhiOutputStreamCallback.java       |  51 ++++
 .../SiddhiOutputStreamDebugCallback.java}          |  24 +-
 .../engine/generator/EventTypeGenerator.java       |  73 ++++++
 .../engine/generator/InputStreamNameGenerator.java |  44 ++++
 .../engine/generator/SiddhiAppGenerator.java       |  92 +++++++
 .../EventType.java}                                |  26 +-
 .../wrapper/siddhi/utils/SiddhiUtils.java          |  73 ++++++
 14 files changed, 689 insertions(+), 253 deletions(-)

diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
similarity index 55%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
index 540d608..3d2a2a3 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
@@ -15,12 +15,20 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.constants;
 
+public class SiddhiConstants {
 
-import io.siddhi.core.event.Event;
+  public static final String SELECT = "select";
+  public static final String INSERT = "insert";
 
-public interface SiddhiDebugCallback {
+  public static final String FIRST_STREAM_PREFIX = "s0";
+  public static final String SECOND_STREAM_PREFIX = "s1";
 
-  void onEvent(Event event);
+  public static final String SIDDHI_LONG_TYPE = "LONG";
+  public static final String SIDDHI_INT_TYPE = "INT";
+  public static final String SIDDHI_DOUBLE_TYPE = "DOUBLE";
+  public static final String SIDDHI_BOOLEAN_TYPE = "BOOL";
+  public static final String SIDDHI_STRING_TYPE = "STRING";
+  public static final String SIDDHI_OBJECT_TYPE = "OBJECT";
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
new file mode 100644
index 0000000..173f30f
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine;
+
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
+import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiOutputStreamCallback;
+import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiOutputStreamDebugCallback;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
+import org.apache.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
+import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SiddhiEngine {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SiddhiEngine.class);
+
+  private SiddhiAppRuntime siddhiAppRuntime;
+  private final Map<String, InputHandler> siddhiInputHandlers;
+  private Map<String, List<EventType>> typeInfo;
+
+  private Boolean debugMode;
+  private SiddhiDebugCallback debugCallback;
+
+  public SiddhiEngine() {
+    this.siddhiInputHandlers = new HashMap<>();
+    this.debugMode = false;
+  }
+
+  public SiddhiEngine(SiddhiDebugCallback debugCallback) {
+    this();
+    this.debugCallback = debugCallback;
+    this.debugMode = true;
+  }
+
+  public void initializeEngine(SiddhiInvocationConfig<? extends EventProcessorBindingParams> settings,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
+
+    EventProcessorBindingParams params = settings.getParams();
+    this.typeInfo = settings.getEventTypeInfo();
+    SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
+
+    //this.timestampField = removeStreamIdFromTimestamp(setTimestamp(parameters));
+
+    siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(settings.getSiddhiAppString());
+    settings.getParams()
+            .getInEventTypes()
+            .forEach((key, value) -> {
+              String preparedKey = SiddhiUtils.prepareName(key);
+              siddhiInputHandlers.put(key, siddhiAppRuntime.getInputHandler(preparedKey));
+            });
+
+    StreamCallback callback;
+
+    if (!debugMode) {
+      callback = new SiddhiOutputStreamCallback(spOutputCollector, settings.getOutputEventKeys(), runtimeContext);
+    } else {
+      callback = new SiddhiOutputStreamDebugCallback(debugCallback);
+    }
+
+    siddhiAppRuntime.addCallback(SiddhiUtils.prepareName(SiddhiUtils.getOutputTopicName(params)), callback);
+    siddhiAppRuntime.start();
+  }
+
+  public void processEvent(org.apache.streampipes.model.runtime.Event event) {
+    try {
+      String sourceId = event.getSourceInfo().getSourceId();
+      InputHandler inputHandler = siddhiInputHandlers.get(sourceId);
+      List<String> eventKeys = this.typeInfo
+              .get(sourceId)
+              .stream()
+              .map(EventType::getEventTypeName)
+              .collect(Collectors.toList());
+
+      inputHandler.send(SiddhiUtils.toObjArr(eventKeys, event.getRaw()));
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void shutdownEngine() {
+    this.siddhiAppRuntime.shutdown();
+  }
+
+//  public void setSortedEventKeys(List<String> sortedEventKeys) {
+//    String streamId = (String) this.listOfEventKeys.keySet().toArray()[0];    // only reliable if there is only one stream, else use changeEventKeys() to respective streamId
+//    changeEventKeys(streamId, sortedEventKeys);
+//  }
+//
+//  public void changeEventKeys(String streamId, List<String> newEventKeys) {
+//    this.listOfEventKeys.put(streamId, newEventKeys);
+//  }
+
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
index a642f6c..585ac5a 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
@@ -17,290 +17,97 @@
  */
 package org.apache.streampipes.wrapper.siddhi.engine;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.runtime.EventFactory;
-import org.apache.streampipes.model.runtime.SchemaInfo;
-import org.apache.streampipes.model.runtime.SourceInfo;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
-import org.apache.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
-import io.siddhi.core.SiddhiAppRuntime;
-import io.siddhi.core.SiddhiManager;
-import io.siddhi.core.event.Event;
-import io.siddhi.core.stream.input.InputHandler;
-import io.siddhi.core.stream.output.StreamCallback;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
+import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.InputStreamNameGenerator;
+import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 
 public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements
-        EventProcessor<B> {
-
-  private StringBuilder siddhiAppString;
-
-  private SiddhiAppRuntime siddhiAppRuntime;
-  private Map<String, InputHandler> siddhiInputHandlers;
-  private List<String> inputStreamNames;
-
-  private Map<String, List> listOfEventKeys;
-  private List<String> outputEventKeys;
-
-  private Boolean debugMode;
-  private SiddhiDebugCallback debugCallback;
-
-  private String timestampField;
+        EventProcessor<B>, SiddhiStatementGenerator<B> {
 
   private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);
 
+  private SiddhiEngine siddhiEngine;
+  private SiddhiInvocationConfig<B> siddhiConfig;
+  private List<String> outputEventKeys = new ArrayList<>();
+
   public SiddhiEventEngine() {
-    this.siddhiAppString = new StringBuilder();
-    this.siddhiInputHandlers = new HashMap<>();
-    this.inputStreamNames = new ArrayList<>();
-    listOfEventKeys = new HashMap<>();
-    outputEventKeys = new ArrayList<>();
-    this.debugMode = false;
+    this.siddhiEngine = new SiddhiEngine();
   }
 
   public SiddhiEventEngine(SiddhiDebugCallback debugCallback) {
-    this();
-    this.debugCallback = debugCallback;
-    this.debugMode = true;
+    this.siddhiEngine = new SiddhiEngine(debugCallback);
   }
 
   @Override
   public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-    if (parameters.getInEventTypes().size() != parameters.getGraph().getInputStreams().size()) {
-      throw new IllegalArgumentException("Input parameters do not match!");
-    }
-
-    SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
-
-    this.timestampField = removeStreamIdFromTimestamp(setTimestamp(parameters));
-
-    LOG.info("Configuring event types for graph " + parameters.getGraph().getName());
-    parameters.getInEventTypes().forEach((key, value) -> {
-      // TODO why is the prefix not in the parameters.getInEventType
-      registerEventTypeIfNotExists(key, value);
-      this.inputStreamNames.add(prepareName(key));
-    });
-
-    LOG.info("Configuring output event keys for graph " + parameters.getGraph().getName());
-    //System.out.println("output key: " + key);
-    outputEventKeys.addAll(parameters.getOutEventType().keySet());
-
+    List<String> inputStreamNames = new InputStreamNameGenerator<>(parameters).generateInputStreamNames();
+    this.outputEventKeys = new ArrayList<>(parameters.getOutEventType().keySet());
     String fromStatement = fromStatement(inputStreamNames, parameters);
-    String selectStatement = selectStatement(parameters);
-    registerStatements(fromStatement, selectStatement, getOutputTopicName(parameters));
-
-    siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiAppString.toString());
-    parameters
-            .getInEventTypes()
-            .forEach((key, value) -> siddhiInputHandlers.put(key, siddhiAppRuntime.getInputHandler(prepareName(key))));
-
-    if (!debugMode) {
-      siddhiAppRuntime.addCallback(prepareName(getOutputTopicName(parameters)), new StreamCallback() {
-        @Override
-        public void receive(Event[] events) {
-          if (events.length > 0) {
-            Event lastEvent = events[events.length - 1];
-            spOutputCollector.collect(toSpEvent(lastEvent, parameters,
-                    runtimeContext.getOutputSchemaInfo
-                    (), runtimeContext.getOutputSourceInfo()));
-          }
-        }
-      });
-    } else {
-      siddhiAppRuntime.addCallback(prepareName(getOutputTopicName(parameters)), new StreamCallback() {
-        @Override
-        public void receive(Event[] events) {
-          LOG.info("Siddhi is firing");
-          if (events.length > 0) {
-            SiddhiEventEngine.this.debugCallback.onEvent(events[events.length - 1]);
-          }
-        }
-      });
-    }
-
-    siddhiAppRuntime.start();
-
-  }
-
-  private String removeStreamIdFromTimestamp(String timestampField) {
-    return timestampField !=null ? timestampField.replaceAll("s0::", "") : null;
-  }
-
-  private String getOutputTopicName(B parameters) {
-    return parameters
-            .getGraph()
-            .getOutputStream()
-            .getEventGrounding()
-            .getTransportProtocol()
-            .getTopicDefinition()
-            .getActualTopicName();
-  }
-
-  private org.apache.streampipes.model.runtime.Event toSpEvent(Event event, B parameters, SchemaInfo
-          schemaInfo, SourceInfo sourceInfo) {
-    Map<String, Object> outMap = new HashMap<>();
-    for (int i = 0; i < outputEventKeys.size(); i++) {
-
-      if (event.getData(i) instanceof LinkedList) {
-        List<Object> tmp = (List<Object>) event.getData(i);
-        outMap.put(outputEventKeys.get(i), tmp.get(0));
-      }
-      else {
-        outMap.put(outputEventKeys.get(i), event.getData(i));
-      }
-
-    }
-    return EventFactory.fromMap(outMap, sourceInfo, schemaInfo);
-  }
-
-
-  private void registerEventTypeIfNotExists(String eventTypeName, Map<String, Object> typeMap) {
-    String defineStreamPrefix = "define stream " + prepareName(eventTypeName);
-    StringJoiner joiner = new StringJoiner(",");
-    int currentNoOfStreams = this.listOfEventKeys.size();
-
-    List<String> sortedEventKeys = new ArrayList<>();
-    for (String key : typeMap.keySet()) {
-      sortedEventKeys.add(key);
-      Collections.sort(sortedEventKeys);
-    }
-
-    listOfEventKeys.put(eventTypeName, sortedEventKeys);
-
-    for (String key : sortedEventKeys) {
-      // TODO: get timestamp field from user params
-      if(key.equalsIgnoreCase(this.timestampField)) {
-        joiner.add("s" + currentNoOfStreams + key + " LONG");
-      }
-      else {
-        joiner.add("s" + currentNoOfStreams + key + " " + toType((Class<?>) typeMap.get(key)));
-      }
-    }
-
-    this.siddhiAppString.append(defineStreamPrefix);
-    this.siddhiAppString.append("(");
-    this.siddhiAppString.append(joiner.toString());
-    this.siddhiAppString.append(");\n");
-  }
-
-  private String toType(Class<?> o) {
-    if (o.equals(Long.class)) {
-      return "LONG";
-    } else if (o.equals(Integer.class)) {
-      return "INT";
-    } else if (o.equals(Double.class)) {
-      return "DOUBLE";
-    } else if (o.equals(Float.class)) {
-      return "DOUBLE";
-    } else if (o.equals(Boolean.class)) {
-      return "BOOL";
-    } else {
-      return "STRING";
-    }
-  }
-
-  private void registerStatements(String fromStatement, String selectStatement, String outputStream) {
-    this.siddhiAppString.append(fromStatement)
-            .append("\n")
-            .append(selectStatement)
-            .append("\n")
-            .append("insert into ")
-            .append(prepareName(outputStream))
-            .append(";");
-
-    LOG.info("Registering statement: \n" + this.siddhiAppString.toString());
-
+    String selectStatement = selectStatement(outputEventKeys, parameters);
+    this.siddhiConfig = new SiddhiInvocationConfig<>(parameters, fromStatement, selectStatement, inputStreamNames);
+    this.siddhiEngine.initializeEngine(this.siddhiConfig, spOutputCollector, runtimeContext);
   }
 
   @Override
   public void onEvent(org.apache.streampipes.model.runtime.Event event, SpOutputCollector collector) {
-    try {
-      String sourceId = event.getSourceInfo().getSourceId();
-      InputHandler inputHandler = siddhiInputHandlers.get(sourceId);
-      List<String> eventKeys = listOfEventKeys.get(sourceId);
-
-      inputHandler.send(toObjArr(eventKeys, event.getRaw()));
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-
-  private Object[] toObjArr(List<String> eventKeys, Map<String, Object> event) {
-    Object[] result = new Object[eventKeys.size()];
-    for (int i = 0; i < eventKeys.size(); i++) {
-      result[i] = event.get(eventKeys.get(i));
-    }
-
-    return result;
+    this.siddhiEngine.processEvent(event);
   }
 
   @Override
   public void onDetach() {
-    this.siddhiAppRuntime.shutdown();
+    this.siddhiEngine.shutdownEngine();
   }
 
-  protected abstract String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  protected abstract String selectStatement(final B bindingParameters);
-
-  protected String setTimestamp(final B bindingparameters) {
-    return null;
+  public SiddhiInvocationConfig<B> getSiddhiConfig() {
+    return this.siddhiConfig;
   }
 
-  protected String prepareName(String eventName) {
-    return eventName
-            .replaceAll("\\.", "")
-            .replaceAll("-", "")
-            .replaceAll("::", "");
+  public String prepareName(String name) {
+    return SiddhiUtils.prepareName(name);
   }
 
-
-
-  protected String getCustomOutputSelectStatement(DataProcessorInvocation invocation,
-                                                  String eventName) {
+  public String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
     StringBuilder selectString = new StringBuilder();
-    selectString.append("select ");
+    selectString.append(SiddhiConstants.SELECT).append(" ");
 
     if (outputEventKeys.size() > 0) {
-      for (int i = 0; i < outputEventKeys.size() - 1; i++) {
-        selectString.append(eventName + ".s0" + outputEventKeys.get(i) + ",");
+      for (int i=0; i<outputEventKeys.size() - 1; i++) {
+        selectString
+                .append(SiddhiConstants.FIRST_STREAM_PREFIX)
+                .append(outputEventKeys.get(i))
+                .append(",");
       }
-      selectString.append(eventName + ".s0" + outputEventKeys.get(outputEventKeys.size() - 1));
-
+      selectString
+              .append(SiddhiConstants.FIRST_STREAM_PREFIX)
+              .append(outputEventKeys.get(outputEventKeys.size() - 1));
     }
-
     return selectString.toString();
   }
 
-//  protected String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
-//    return getCustomOutputSelectStatement(invocation, "e1");
-//  }
-
-  protected String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
+  public String getCustomOutputSelectStatement(DataProcessorInvocation invocation,
+                                               String eventName) {
     StringBuilder selectString = new StringBuilder();
-    selectString.append("select ");
+    selectString.append(SiddhiConstants.SELECT).append(" ");
 
     if (outputEventKeys.size() > 0) {
-      for (int i=0; i<outputEventKeys.size() - 1; i++) {
-        selectString.append("s0" + outputEventKeys.get(i) + ",");
+      for (int i = 0; i < outputEventKeys.size() - 1; i++) {
+        selectString.append(eventName).append(".s0").append(outputEventKeys.get(i)).append(",");
       }
-      selectString.append("s0" + outputEventKeys.get(outputEventKeys.size() - 1));
-    }
-    return selectString.toString();
-  }
+      selectString.append(eventName).append(".s0").append(outputEventKeys.get(outputEventKeys.size() - 1));
 
-  public void setSortedEventKeys(List<String> sortedEventKeys) {
-    String streamId = (String) this.listOfEventKeys.keySet().toArray()[0];    // only reliable if there is only one stream, else use changeEventKeys() to respective streamId
-    changeEventKeys(streamId, sortedEventKeys);
-  }
+    }
 
-  public void changeEventKeys(String streamId, List<String> newEventKeys) {
-    this.listOfEventKeys.put(streamId, newEventKeys);
+    return selectString.toString();
   }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java
new file mode 100644
index 0000000..62eb29c
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine;
+
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.EventTypeGenerator;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiAppGenerator;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SiddhiInvocationConfig<B extends EventProcessorBindingParams> {
+
+  private final B params;
+  private final String siddhiAppString;
+  private final List<String> inputStreamNames;
+  private final Map<String, List<EventType>> eventTypeInfo;
+  private final List<String> outputEventKeys;
+
+  public SiddhiInvocationConfig(B params,
+                                String fromStatement,
+                                String selectStatement,
+                                List<String> inputStreamNames) {
+    this.params = params;
+    this.inputStreamNames = inputStreamNames;
+    this.eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
+    this.siddhiAppString = new SiddhiAppGenerator<>(params, inputStreamNames, eventTypeInfo, fromStatement, selectStatement)
+            .generateSiddhiApp();
+    this.outputEventKeys = new ArrayList<>(this.params.getOutEventType().keySet());
+  }
+
+  public B getParams() {
+    return params;
+  }
+
+  public String getSiddhiAppString() {
+    return siddhiAppString;
+  }
+
+  public List<String> getInputStreamNames() {
+    return inputStreamNames;
+  }
+
+  public Map<String, List<EventType>> getEventTypeInfo() {
+    return eventTypeInfo;
+  }
+
+  public List<String> getOutputEventKeys() {
+    return this.outputEventKeys;
+  }
+
+  public String getSiddhiApp() {
+    return this.siddhiAppString;
+  }
+
+
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
similarity index 70%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
index 540d608..d7adcb9 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
@@ -17,10 +17,14 @@
  */
 package org.apache.streampipes.wrapper.siddhi.engine;
 
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
-import io.siddhi.core.event.Event;
+import java.util.List;
 
-public interface SiddhiDebugCallback {
+public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+
+  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+
+  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
 
-  void onEvent(Event event);
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
new file mode 100644
index 0000000..d603e9a
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcessor implements SiddhiStatementGenerator<ProcessorParams> {
+
+  private SiddhiEngine siddhiEngine;
+
+  public StreamPipesSiddhiProcessor() {
+    this.siddhiEngine = new SiddhiEngine();
+  }
+
+  public StreamPipesSiddhiProcessor(SiddhiDebugCallback debugCallback) {
+    this.siddhiEngine = new SiddhiEngine(debugCallback);
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+    this.siddhiEngine.processEvent(event);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    this.siddhiEngine.shutdownEngine();
+  }
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiDebugCallback.java
similarity index 93%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiDebugCallback.java
index 540d608..838071f 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiDebugCallback.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.engine.callback;
 
 
 import io.siddhi.core.event.Event;
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
new file mode 100644
index 0000000..947e99a
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine.callback;
+
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
+
+import java.util.List;
+
+public class SiddhiOutputStreamCallback extends StreamCallback {
+
+  private SpOutputCollector collector;
+  private List<String> outputEventKeys;
+  private EventProcessorRuntimeContext runtimeContext;
+
+  public SiddhiOutputStreamCallback(SpOutputCollector collector,
+                                    List<String> outputEventKeys,
+                                    EventProcessorRuntimeContext runtimeContext) {
+    this.collector = collector;
+    this.outputEventKeys = outputEventKeys;
+    this.runtimeContext = runtimeContext;
+  }
+
+  @Override
+  public void receive(Event[] events) {
+    if (events.length > 0) {
+      Event lastEvent = events[events.length - 1];
+      collector.collect(SiddhiUtils.toSpEvent(lastEvent, outputEventKeys,
+              runtimeContext.getOutputSchemaInfo
+                      (), runtimeContext.getOutputSourceInfo()));
+    }
+  }
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamDebugCallback.java
similarity index 55%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamDebugCallback.java
index 540d608..a79f911 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamDebugCallback.java
@@ -15,12 +15,28 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
-
+package org.apache.streampipes.wrapper.siddhi.engine.callback;
 
 import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SiddhiOutputStreamDebugCallback extends StreamCallback {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SiddhiOutputStreamDebugCallback.class);
+
+  private SiddhiDebugCallback callback;
 
-public interface SiddhiDebugCallback {
+  public SiddhiOutputStreamDebugCallback(SiddhiDebugCallback callback) {
+    this.callback = callback;
+  }
 
-  void onEvent(Event event);
+  @Override
+  public void receive(Event[] events) {
+    LOG.info("Siddhi is firing");
+    if (events.length > 0) {
+      this.callback.onEvent(events[events.length - 1]);
+    }
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
new file mode 100644
index 0000000..1aa95a4
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine.generator;
+
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class EventTypeGenerator<B extends EventProcessorBindingParams> {
+
+  private B params;
+
+  public EventTypeGenerator(B params) {
+    this.params = params;
+  }
+
+  public Map<String, List<EventType>> generateEventTypes() {
+    Map<String, List<EventType>> listOfEventKeys = new HashMap<>();
+    AtomicReference<Integer> currentStreamIndex = new AtomicReference<>(0);
+
+    params.getInEventTypes().forEach((key, value) -> {
+      List<EventType> sortedEventKeys = new ArrayList<>();
+      for (String propertyKey : value.keySet()) {
+        sortedEventKeys.add(makeEventType(currentStreamIndex.get(), propertyKey, value.get(propertyKey)));
+        sortedEventKeys.sort(Comparator.comparing(EventType::getEventTypeName));
+      }
+      listOfEventKeys.put(key, sortedEventKeys);
+      currentStreamIndex.getAndSet(currentStreamIndex.get() + 1);
+    });
+
+    return listOfEventKeys;
+  }
+
+  private EventType makeEventType(Integer currentStreamIndex, String propertyName, Object propertyType) {
+    return new EventType(currentStreamIndex, propertyName, toType((Class<?>) propertyType));
+  }
+
+  private String toType(Class<?> o) {
+    if (o.equals(Long.class)) {
+      return SiddhiConstants.SIDDHI_LONG_TYPE;
+    } else if (o.equals(Integer.class)) {
+      return SiddhiConstants.SIDDHI_INT_TYPE;
+    } else if (o.equals(Double.class)) {
+      return SiddhiConstants.SIDDHI_DOUBLE_TYPE;
+    } else if (o.equals(Float.class)) {
+      return SiddhiConstants.SIDDHI_DOUBLE_TYPE;
+    } else if (o.equals(Boolean.class)) {
+      return SiddhiConstants.SIDDHI_BOOLEAN_TYPE;
+    } else if (o.equals(String.class)){
+      return SiddhiConstants.SIDDHI_STRING_TYPE;
+    } else {
+      return SiddhiConstants.SIDDHI_OBJECT_TYPE;
+    }
+  }
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/InputStreamNameGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/InputStreamNameGenerator.java
new file mode 100644
index 0000000..0bd8ee9
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/InputStreamNameGenerator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine.generator;
+
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InputStreamNameGenerator<B extends EventProcessorBindingParams> {
+
+  private B params;
+
+  public InputStreamNameGenerator(B params) {
+    this.params = params;
+  }
+
+  public List<String> generateInputStreamNames() {
+    List<String> inputStreamNames = new ArrayList<>();
+    this.params.getInEventTypes().forEach((key, value) -> {
+      // TODO why is the prefix not in the parameters.getInEventType
+      //registerEventTypeIfNotExists(key, value);
+      inputStreamNames.add(SiddhiUtils.prepareName(key));
+    });
+
+    return inputStreamNames;
+  }
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
new file mode 100644
index 0000000..0c54f16
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine.generator;
+
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
+import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+
+public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SiddhiAppGenerator.class);
+
+  private B params;
+  private List<String> inputStreamNames;
+  private Map<String, List<EventType>> eventTypes;
+  private final String fromStatement;
+  private final String selectStatement;
+
+  private final StringBuilder siddhiAppString;
+
+  public SiddhiAppGenerator(B params,
+                            List<String> inputStreamNames,
+                            Map<String, List<EventType>> eventTypes,
+                            String fromStatement,
+                            String selectStatement) {
+    this.params = params;
+    this.inputStreamNames = inputStreamNames;
+    this.eventTypes = eventTypes;
+    this.fromStatement = fromStatement;
+    this.selectStatement = selectStatement;
+    this.siddhiAppString = new StringBuilder();
+  }
+
+  public String generateSiddhiApp() {
+    LOG.info("Configuring event types for graph " + params.getGraph().getName());
+
+    this.eventTypes.forEach(this::registerEventType);
+    registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(params));
+
+    return this.siddhiAppString.toString();
+  }
+
+  private void registerEventType(String eventTypeName, List<EventType> eventSchema) {
+    String defineStreamPrefix = "define stream " + SiddhiUtils.prepareName(eventTypeName);
+    StringJoiner joiner = new StringJoiner(",");
+
+    eventSchema.forEach(typeInfo -> {
+        joiner.add("s" + typeInfo.getStreamIdentifier() + typeInfo.getEventTypeName() + " " + typeInfo.getEventType());
+    });
+
+    this.siddhiAppString
+            .append(defineStreamPrefix)
+            .append("(")
+            .append(joiner.toString())
+            .append(");\n");
+  }
+
+  private void registerStatements(String fromStatement, String selectStatement, String outputStream) {
+    this.siddhiAppString
+            .append(fromStatement)
+            .append("\n")
+            .append(selectStatement)
+            .append("\n")
+            .append("insert into ")
+            .append(SiddhiUtils.prepareName(outputStream))
+            .append(";");
+
+    LOG.info("Registering statement: \n" + this.siddhiAppString.toString());
+
+  }
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
similarity index 58%
rename from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
rename to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
index 540d608..1264caf 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiDebugCallback.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
@@ -15,12 +15,30 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.model;
 
+public class EventType {
 
-import io.siddhi.core.event.Event;
+  private Integer streamIdentifier;
+  private String eventTypeName;
+  private String eventType;
 
-public interface SiddhiDebugCallback {
+  public EventType(Integer streamIdentifier, String eventTypeName, String eventType) {
+    this.streamIdentifier = streamIdentifier;
+    this.eventTypeName = eventTypeName;
+    this.eventType = eventType;
+  }
+
+  public Integer getStreamIdentifier() {
+    return streamIdentifier;
+  }
+
+  public String getEventTypeName() {
+    return eventTypeName;
+  }
+
+  public String getEventType() {
+    return eventType;
+  }
 
-  void onEvent(Event event);
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
new file mode 100644
index 0000000..2292f18
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.utils;
+
+import io.siddhi.core.event.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.runtime.SchemaInfo;
+import org.apache.streampipes.model.runtime.SourceInfo;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SiddhiUtils {
+
+  public static org.apache.streampipes.model.runtime.Event toSpEvent(Event event, List<String> outputEventKeys, SchemaInfo
+          schemaInfo, SourceInfo sourceInfo) {
+    Map<String, Object> outMap = new HashMap<>();
+
+    for (int i = 0; i < outputEventKeys.size(); i++) {
+      if (event.getData(i) instanceof List) {
+        List<Object> tmp = (List<Object>) event.getData(i);
+        outMap.put(outputEventKeys.get(i), tmp.get(0));
+      }
+      else {
+        outMap.put(outputEventKeys.get(i), event.getData(i));
+      }
+    }
+    return EventFactory.fromMap(outMap, sourceInfo, schemaInfo);
+  }
+
+  public static Object[] toObjArr(List<String> eventKeys, Map<String, Object> event) {
+    Object[] result = new Object[eventKeys.size()];
+    for (int i = 0; i < eventKeys.size(); i++) {
+      result[i] = event.get(eventKeys.get(i));
+    }
+
+    return result;
+  }
+
+  public static String getOutputTopicName(EventProcessorBindingParams parameters) {
+    return parameters
+            .getGraph()
+            .getOutputStream()
+            .getEventGrounding()
+            .getTransportProtocol()
+            .getTopicDefinition()
+            .getActualTopicName();
+  }
+
+  public static String prepareName(String eventName) {
+    return eventName
+            .replaceAll("\\.", "")
+            .replaceAll("-", "")
+            .replaceAll("::", "");
+  }
+}


[incubator-streampipes] 03/03: [STREAMPIPES-250] Refactor Siddhi Wrapper

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 8c5beb86f5c5e5cf36d39476b7e042674bf2d296
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue Oct 20 22:40:39 2020 +0200

    [STREAMPIPES-250] Refactor Siddhi Wrapper
---
 streampipes-wrapper-siddhi/pom.xml                 |  6 ++
 .../wrapper/siddhi/constants/SiddhiConstants.java  |  2 +
 .../wrapper/siddhi/engine/SiddhiEngine.java        | 23 ++++---
 .../wrapper/siddhi/engine/SiddhiEventEngine.java   | 55 ++--------------
 .../siddhi/engine/SiddhiInvocationConfig.java      | 74 ----------------------
 .../siddhi/engine/SiddhiStatementGenerator.java    |  7 +-
 .../siddhi/engine/StreamPipesSiddhiProcessor.java  |  5 +-
 .../callback/SiddhiOutputStreamCallback.java       | 17 +++--
 .../engine/generator/EventTypeGenerator.java       |  8 ++-
 .../engine/generator/SiddhiAppGenerator.java       | 22 +++----
 .../generator/SiddhiInvocationConfigGenerator.java | 65 +++++++++++++++++++
 .../wrapper/siddhi/model/EventType.java            | 27 ++++----
 .../SiddhiProcessorParams.java}                    | 72 ++++++++-------------
 .../AbstractQueryGenerator.java}                   | 19 ++++--
 .../FromClause.java}                               | 13 +---
 .../SelectClause.java}                             | 16 +++--
 .../EventType.java => query/SiddhiStatement.java}  | 29 ++++-----
 .../expression/Expression.java}                    | 17 +++--
 .../expression/Expressions.java}                   | 18 +++---
 .../expression/PropertyExpression.java}            | 13 +---
 .../wrapper/siddhi/utils/SiddhiUtils.java          | 25 +++++---
 21 files changed, 237 insertions(+), 296 deletions(-)

diff --git a/streampipes-wrapper-siddhi/pom.xml b/streampipes-wrapper-siddhi/pom.xml
index 1a8ce3a..fdbad74 100644
--- a/streampipes-wrapper-siddhi/pom.xml
+++ b/streampipes-wrapper-siddhi/pom.xml
@@ -52,6 +52,12 @@
             <groupId>io.siddhi</groupId>
             <artifactId>siddhi-query-compiler</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.siddhi.extension.execution.list</groupId>
+            <artifactId>siddhi-execution-list</artifactId>
+            <version>1.0.1</version>
+        </dependency>
+
     </dependencies>
 
     <repositories>
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
index 3d2a2a3..146be5a 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
@@ -21,6 +21,8 @@ public class SiddhiConstants {
 
   public static final String SELECT = "select";
   public static final String INSERT = "insert";
+  public static final String WHITESPACE = " ";
+  public static final String ASTERISK = "*";
 
   public static final String FIRST_STREAM_PREFIX = "s0";
   public static final String SECOND_STREAM_PREFIX = "s1";
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
index 173f30f..dcc7eed 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
@@ -21,14 +21,17 @@ import io.siddhi.core.SiddhiAppRuntime;
 import io.siddhi.core.SiddhiManager;
 import io.siddhi.core.stream.input.InputHandler;
 import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiOutputStreamCallback;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiOutputStreamDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.model.EventType;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
 import org.apache.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,18 +63,18 @@ public class SiddhiEngine {
     this.debugMode = true;
   }
 
-  public void initializeEngine(SiddhiInvocationConfig<? extends EventProcessorBindingParams> settings,
+  public void initializeEngine(SiddhiInvocationConfigGenerator<? extends EventProcessorBindingParams> settings,
                                SpOutputCollector spOutputCollector,
                                EventProcessorRuntimeContext runtimeContext) {
 
-    EventProcessorBindingParams params = settings.getParams();
-    this.typeInfo = settings.getEventTypeInfo();
+    EventProcessorBindingParams params = settings.getSiddhiProcessorParams().getParams();
+    this.typeInfo = settings.getSiddhiProcessorParams().getEventTypeInfo();
     SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
 
     //this.timestampField = removeStreamIdFromTimestamp(setTimestamp(parameters));
 
     siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(settings.getSiddhiAppString());
-    settings.getParams()
+    settings.getSiddhiProcessorParams().getParams()
             .getInEventTypes()
             .forEach((key, value) -> {
               String preparedKey = SiddhiUtils.prepareName(key);
@@ -79,14 +82,16 @@ public class SiddhiEngine {
             });
 
     StreamCallback callback;
-
+    Map<String, StreamDefinition> streamDef = siddhiAppRuntime.getStreamDefinitionMap();
+    String outputKey = SiddhiUtils.getPreparedOutputTopicName(params);
+    List<Attribute> streamAttributes = streamDef.get(outputKey).getAttributeList();
     if (!debugMode) {
-      callback = new SiddhiOutputStreamCallback(spOutputCollector, settings.getOutputEventKeys(), runtimeContext);
+      callback = new SiddhiOutputStreamCallback(spOutputCollector, runtimeContext, streamAttributes);
     } else {
       callback = new SiddhiOutputStreamDebugCallback(debugCallback);
     }
 
-    siddhiAppRuntime.addCallback(SiddhiUtils.prepareName(SiddhiUtils.getOutputTopicName(params)), callback);
+    siddhiAppRuntime.addCallback(SiddhiUtils.getPreparedOutputTopicName(params), callback);
     siddhiAppRuntime.start();
   }
 
@@ -97,7 +102,7 @@ public class SiddhiEngine {
       List<String> eventKeys = this.typeInfo
               .get(sourceId)
               .stream()
-              .map(EventType::getEventTypeName)
+              .map(EventType::getFieldName)
               .collect(Collectors.toList());
 
       inputHandler.send(SiddhiUtils.toObjArr(eventKeys, event.getRaw()));
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
index 585ac5a..a6ce6e1 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
@@ -17,29 +17,22 @@
  */
 package org.apache.streampipes.wrapper.siddhi.engine;
 
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
-import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.InputStreamNameGenerator;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements
         EventProcessor<B>, SiddhiStatementGenerator<B> {
 
   private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);
 
   private SiddhiEngine siddhiEngine;
-  private SiddhiInvocationConfig<B> siddhiConfig;
-  private List<String> outputEventKeys = new ArrayList<>();
 
   public SiddhiEventEngine() {
     this.siddhiEngine = new SiddhiEngine();
@@ -51,12 +44,9 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
 
   @Override
   public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-    List<String> inputStreamNames = new InputStreamNameGenerator<>(parameters).generateInputStreamNames();
-    this.outputEventKeys = new ArrayList<>(parameters.getOutEventType().keySet());
-    String fromStatement = fromStatement(inputStreamNames, parameters);
-    String selectStatement = selectStatement(outputEventKeys, parameters);
-    this.siddhiConfig = new SiddhiInvocationConfig<>(parameters, fromStatement, selectStatement, inputStreamNames);
-    this.siddhiEngine.initializeEngine(this.siddhiConfig, spOutputCollector, runtimeContext);
+    SiddhiInvocationConfigGenerator<B> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
+            this::fromStatement, this::selectStatement);
+    this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
   }
 
   @Override
@@ -69,45 +59,8 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
     this.siddhiEngine.shutdownEngine();
   }
 
-  public SiddhiInvocationConfig<B> getSiddhiConfig() {
-    return this.siddhiConfig;
-  }
-
   public String prepareName(String name) {
     return SiddhiUtils.prepareName(name);
   }
 
-  public String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
-    StringBuilder selectString = new StringBuilder();
-    selectString.append(SiddhiConstants.SELECT).append(" ");
-
-    if (outputEventKeys.size() > 0) {
-      for (int i=0; i<outputEventKeys.size() - 1; i++) {
-        selectString
-                .append(SiddhiConstants.FIRST_STREAM_PREFIX)
-                .append(outputEventKeys.get(i))
-                .append(",");
-      }
-      selectString
-              .append(SiddhiConstants.FIRST_STREAM_PREFIX)
-              .append(outputEventKeys.get(outputEventKeys.size() - 1));
-    }
-    return selectString.toString();
-  }
-
-  public String getCustomOutputSelectStatement(DataProcessorInvocation invocation,
-                                               String eventName) {
-    StringBuilder selectString = new StringBuilder();
-    selectString.append(SiddhiConstants.SELECT).append(" ");
-
-    if (outputEventKeys.size() > 0) {
-      for (int i = 0; i < outputEventKeys.size() - 1; i++) {
-        selectString.append(eventName).append(".s0").append(outputEventKeys.get(i)).append(",");
-      }
-      selectString.append(eventName).append(".s0").append(outputEventKeys.get(outputEventKeys.size() - 1));
-
-    }
-
-    return selectString.toString();
-  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java
deleted file mode 100644
index 62eb29c..0000000
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.EventTypeGenerator;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiAppGenerator;
-import org.apache.streampipes.wrapper.siddhi.model.EventType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class SiddhiInvocationConfig<B extends EventProcessorBindingParams> {
-
-  private final B params;
-  private final String siddhiAppString;
-  private final List<String> inputStreamNames;
-  private final Map<String, List<EventType>> eventTypeInfo;
-  private final List<String> outputEventKeys;
-
-  public SiddhiInvocationConfig(B params,
-                                String fromStatement,
-                                String selectStatement,
-                                List<String> inputStreamNames) {
-    this.params = params;
-    this.inputStreamNames = inputStreamNames;
-    this.eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
-    this.siddhiAppString = new SiddhiAppGenerator<>(params, inputStreamNames, eventTypeInfo, fromStatement, selectStatement)
-            .generateSiddhiApp();
-    this.outputEventKeys = new ArrayList<>(this.params.getOutEventType().keySet());
-  }
-
-  public B getParams() {
-    return params;
-  }
-
-  public String getSiddhiAppString() {
-    return siddhiAppString;
-  }
-
-  public List<String> getInputStreamNames() {
-    return inputStreamNames;
-  }
-
-  public Map<String, List<EventType>> getEventTypeInfo() {
-    return eventTypeInfo;
-  }
-
-  public List<String> getOutputEventKeys() {
-    return this.outputEventKeys;
-  }
-
-  public String getSiddhiApp() {
-    return this.siddhiAppString;
-  }
-
-
-}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
index d7adcb9..9abc569 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
@@ -18,13 +18,12 @@
 package org.apache.streampipes.wrapper.siddhi.engine;
 
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
 
 public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
 
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+  String fromStatement(SiddhiProcessorParams<B> siddhiParams);
 
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+  String selectStatement(SiddhiProcessorParams<B> siddhiParams);
 
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
index d603e9a..2aabb4c 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
 import org.apache.streampipes.wrapper.standalone.ProcessorParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
@@ -39,7 +40,9 @@ public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcesso
 
   @Override
   public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
+    SiddhiInvocationConfigGenerator<ProcessorParams> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
+            this::fromStatement, this::selectStatement);
+    this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
   }
 
   @Override
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
index 947e99a..95dcf1e 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.wrapper.siddhi.engine.callback;
 
 import io.siddhi.core.event.Event;
 import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.Attribute;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
@@ -28,24 +29,26 @@ import java.util.List;
 public class SiddhiOutputStreamCallback extends StreamCallback {
 
   private SpOutputCollector collector;
-  private List<String> outputEventKeys;
   private EventProcessorRuntimeContext runtimeContext;
 
+  private List<Attribute> streamAttributes;
+
   public SiddhiOutputStreamCallback(SpOutputCollector collector,
-                                    List<String> outputEventKeys,
-                                    EventProcessorRuntimeContext runtimeContext) {
+                                    EventProcessorRuntimeContext runtimeContext,
+                                    List<Attribute> streamAttributes) {
     this.collector = collector;
-    this.outputEventKeys = outputEventKeys;
     this.runtimeContext = runtimeContext;
+    this.streamAttributes = streamAttributes;
   }
 
   @Override
   public void receive(Event[] events) {
     if (events.length > 0) {
       Event lastEvent = events[events.length - 1];
-      collector.collect(SiddhiUtils.toSpEvent(lastEvent, outputEventKeys,
-              runtimeContext.getOutputSchemaInfo
-                      (), runtimeContext.getOutputSourceInfo()));
+      collector.collect(SiddhiUtils.toSpEvent(lastEvent,
+              runtimeContext.getOutputSchemaInfo(),
+              runtimeContext.getOutputSourceInfo(),
+              streamAttributes));
     }
   }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
index 1aa95a4..7c1d28c 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
@@ -40,7 +40,7 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
       List<EventType> sortedEventKeys = new ArrayList<>();
       for (String propertyKey : value.keySet()) {
         sortedEventKeys.add(makeEventType(currentStreamIndex.get(), propertyKey, value.get(propertyKey)));
-        sortedEventKeys.sort(Comparator.comparing(EventType::getEventTypeName));
+        sortedEventKeys.sort(Comparator.comparing(EventType::getFieldName));
       }
       listOfEventKeys.put(key, sortedEventKeys);
       currentStreamIndex.getAndSet(currentStreamIndex.get() + 1);
@@ -50,7 +50,7 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
   }
 
   private EventType makeEventType(Integer currentStreamIndex, String propertyName, Object propertyType) {
-    return new EventType(currentStreamIndex, propertyName, toType((Class<?>) propertyType));
+    return new EventType(toSelectorPrefix(currentStreamIndex), propertyName, toType((Class<?>) propertyType));
   }
 
   private String toType(Class<?> o) {
@@ -70,4 +70,8 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
       return SiddhiConstants.SIDDHI_OBJECT_TYPE;
     }
   }
+
+  private String toSelectorPrefix(Integer currentStreamIndex) {
+    return currentStreamIndex == 0 ? SiddhiConstants.FIRST_STREAM_PREFIX : SiddhiConstants.SECOND_STREAM_PREFIX;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
index 0c54f16..b8fd175 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
@@ -18,45 +18,39 @@
 package org.apache.streampipes.wrapper.siddhi.engine.generator;
 
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
 import org.apache.streampipes.wrapper.siddhi.model.EventType;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
 import java.util.StringJoiner;
 
 public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
 
   private static final Logger LOG = LoggerFactory.getLogger(SiddhiAppGenerator.class);
 
-  private B params;
-  private List<String> inputStreamNames;
-  private Map<String, List<EventType>> eventTypes;
+  private final SiddhiProcessorParams<B> siddhiParams;
   private final String fromStatement;
   private final String selectStatement;
 
   private final StringBuilder siddhiAppString;
 
-  public SiddhiAppGenerator(B params,
-                            List<String> inputStreamNames,
-                            Map<String, List<EventType>> eventTypes,
+  public SiddhiAppGenerator(SiddhiProcessorParams<B> siddhiParams,
                             String fromStatement,
                             String selectStatement) {
-    this.params = params;
-    this.inputStreamNames = inputStreamNames;
-    this.eventTypes = eventTypes;
+    this.siddhiParams = siddhiParams;
     this.fromStatement = fromStatement;
     this.selectStatement = selectStatement;
     this.siddhiAppString = new StringBuilder();
   }
 
   public String generateSiddhiApp() {
-    LOG.info("Configuring event types for graph " + params.getGraph().getName());
+    LOG.info("Configuring event types for graph " + this.siddhiParams.getParams().getGraph().getName());
 
-    this.eventTypes.forEach(this::registerEventType);
-    registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(params));
+    this.siddhiParams.getEventTypeInfo().forEach(this::registerEventType);
+    registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(this.siddhiParams.getParams()));
 
     return this.siddhiAppString.toString();
   }
@@ -66,7 +60,7 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
     StringJoiner joiner = new StringJoiner(",");
 
     eventSchema.forEach(typeInfo -> {
-        joiner.add("s" + typeInfo.getStreamIdentifier() + typeInfo.getEventTypeName() + " " + typeInfo.getEventType());
+      joiner.add(typeInfo.getSelectorPrefix() + typeInfo.getFieldName() + " " + typeInfo.getFieldType());
     });
 
     this.siddhiAppString
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
new file mode 100644
index 0000000..9c25f15
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine.generator;
+
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class SiddhiInvocationConfigGenerator<B extends EventProcessorBindingParams> {
+
+  private final String siddhiAppString;
+  private final String fromStatement;
+  private final String selectStatement;
+
+  private final SiddhiProcessorParams<B> siddhiProcessorParams;
+
+  public SiddhiInvocationConfigGenerator(B params,
+                                         Function<SiddhiProcessorParams<B>, String> fromStatementFunction,
+                                         Function<SiddhiProcessorParams<B>, String> selectStatementFunction) {
+    List<String> inputStreamNames = new InputStreamNameGenerator<>(params).generateInputStreamNames();
+    Map<String, List<EventType>> eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
+    List<String> outputEventKeys = new ArrayList<>(params.getOutEventType().keySet());
+    this.siddhiProcessorParams = new SiddhiProcessorParams<>(params, inputStreamNames, eventTypeInfo, outputEventKeys);
+    this.fromStatement = fromStatementFunction.apply(this.siddhiProcessorParams);
+    this.selectStatement = selectStatementFunction.apply(this.siddhiProcessorParams);
+    this.siddhiAppString = new SiddhiAppGenerator<>(siddhiProcessorParams, fromStatement, selectStatement)
+            .generateSiddhiApp();
+  }
+
+  public String getSiddhiAppString() {
+    return siddhiAppString;
+  }
+
+  public SiddhiProcessorParams<B> getSiddhiProcessorParams() {
+    return siddhiProcessorParams;
+  }
+
+  public String getFromStatement() {
+    return fromStatement;
+  }
+
+  public String getSelectStatement() {
+    return selectStatement;
+  }
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
index 1264caf..d1e324f 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
@@ -19,26 +19,25 @@ package org.apache.streampipes.wrapper.siddhi.model;
 
 public class EventType {
 
-  private Integer streamIdentifier;
-  private String eventTypeName;
-  private String eventType;
+  private String selectorPrefix;
+  private String fieldName;
+  private String fieldType;
 
-  public EventType(Integer streamIdentifier, String eventTypeName, String eventType) {
-    this.streamIdentifier = streamIdentifier;
-    this.eventTypeName = eventTypeName;
-    this.eventType = eventType;
+  public EventType(String selectorPrefix, String fieldName, String fieldType) {
+    this.selectorPrefix = selectorPrefix;
+    this.fieldName = fieldName;
+    this.fieldType = fieldType;
   }
 
-  public Integer getStreamIdentifier() {
-    return streamIdentifier;
+  public String getSelectorPrefix() {
+    return selectorPrefix;
   }
 
-  public String getEventTypeName() {
-    return eventTypeName;
+  public String getFieldName() {
+    return fieldName;
   }
 
-  public String getEventType() {
-    return eventType;
+  public String getFieldType() {
+    return fieldType;
   }
-
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
similarity index 51%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
index 585ac5a..0a2ea57 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
@@ -15,66 +15,46 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.model;
 
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
 import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
-import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.InputStreamNameGenerator;
-import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.List;
-
-public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements
-        EventProcessor<B>, SiddhiStatementGenerator<B> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);
-
-  private SiddhiEngine siddhiEngine;
-  private SiddhiInvocationConfig<B> siddhiConfig;
-  private List<String> outputEventKeys = new ArrayList<>();
-
-  public SiddhiEventEngine() {
-    this.siddhiEngine = new SiddhiEngine();
-  }
-
-  public SiddhiEventEngine(SiddhiDebugCallback debugCallback) {
-    this.siddhiEngine = new SiddhiEngine(debugCallback);
-  }
-
-  @Override
-  public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-    List<String> inputStreamNames = new InputStreamNameGenerator<>(parameters).generateInputStreamNames();
-    this.outputEventKeys = new ArrayList<>(parameters.getOutEventType().keySet());
-    String fromStatement = fromStatement(inputStreamNames, parameters);
-    String selectStatement = selectStatement(outputEventKeys, parameters);
-    this.siddhiConfig = new SiddhiInvocationConfig<>(parameters, fromStatement, selectStatement, inputStreamNames);
-    this.siddhiEngine.initializeEngine(this.siddhiConfig, spOutputCollector, runtimeContext);
+import java.util.Map;
+
+public class SiddhiProcessorParams<B extends EventProcessorBindingParams> {
+
+  private final B params;
+  private final List<String> inputStreamNames;
+  private final Map<String, List<EventType>> eventTypeInfo;
+  private final List<String> outputEventKeys;
+
+  public SiddhiProcessorParams(B params,
+                               List<String> inputStreamNames,
+                               Map<String, List<EventType>> eventTypeInfo,
+                               List<String> outputEventKeys) {
+    this.params = params;
+    this.inputStreamNames = inputStreamNames;
+    this.eventTypeInfo = eventTypeInfo;
+    this.outputEventKeys = outputEventKeys;
   }
 
-  @Override
-  public void onEvent(org.apache.streampipes.model.runtime.Event event, SpOutputCollector collector) {
-    this.siddhiEngine.processEvent(event);
+  public B getParams() {
+    return params;
   }
 
-  @Override
-  public void onDetach() {
-    this.siddhiEngine.shutdownEngine();
+  public List<String> getInputStreamNames() {
+    return inputStreamNames;
   }
 
-  public SiddhiInvocationConfig<B> getSiddhiConfig() {
-    return this.siddhiConfig;
+  public Map<String, List<EventType>> getEventTypeInfo() {
+    return eventTypeInfo;
   }
 
-  public String prepareName(String name) {
-    return SiddhiUtils.prepareName(name);
+  public List<String> getOutputEventKeys() {
+    return outputEventKeys;
   }
 
   public String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
similarity index 61%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
index d7adcb9..c7fe3be 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query;
 
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
 
-import java.util.List;
+import java.util.Arrays;
+import java.util.StringJoiner;
 
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public abstract class AbstractQueryGenerator {
 
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+  public static String join(String... substrings) {
+    StringJoiner joiner = new StringJoiner(SiddhiConstants.WHITESPACE);
+    Arrays.stream(substrings).forEach(joiner::add);
 
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+    return joiner.toString();
+  }
 
+  public static String fromString(String query) {
+    return null;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
index d7adcb9..6667e89 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
@@ -15,16 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+package org.apache.streampipes.wrapper.siddhi.query;
 
+public class FromClause extends AbstractQueryGenerator {
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
similarity index 64%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
index d7adcb9..c126880 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
@@ -15,16 +15,20 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query;
 
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
+import org.apache.streampipes.wrapper.siddhi.query.expression.Expression;
 
 import java.util.List;
 
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public class SelectClause extends AbstractQueryGenerator {
 
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+  public static String createWildcard() {
+    return join(SiddhiConstants.SELECT, SiddhiConstants.ASTERISK);
+  }
 
+  public static String create(List<Expression> outputEventProperties) {
+    return null;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
similarity index 58%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
index 1264caf..44bfcf5 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
@@ -15,30 +15,27 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.model;
+package org.apache.streampipes.wrapper.siddhi.query;
 
-public class EventType {
+public class SiddhiStatement {
 
-  private Integer streamIdentifier;
-  private String eventTypeName;
-  private String eventType;
+  private FromClause fromClause;
+  private SelectClause selectClause;
 
-  public EventType(Integer streamIdentifier, String eventTypeName, String eventType) {
-    this.streamIdentifier = streamIdentifier;
-    this.eventTypeName = eventTypeName;
-    this.eventType = eventType;
+  private SiddhiStatement(FromClause fromClause, SelectClause selectClause) {
+    this.fromClause = fromClause;
+    this.selectClause = selectClause;
   }
 
-  public Integer getStreamIdentifier() {
-    return streamIdentifier;
+  public static SiddhiStatement from(FromClause fromClause, SelectClause selectClause) {
+    return new SiddhiStatement(fromClause, selectClause);
   }
 
-  public String getEventTypeName() {
-    return eventTypeName;
+  public FromClause getFromClause() {
+    return fromClause;
   }
 
-  public String getEventType() {
-    return eventType;
+  public SelectClause getSelectClause() {
+    return selectClause;
   }
-
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
similarity index 63%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
index d7adcb9..aa4d10d 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
@@ -15,16 +15,21 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query.expression;
 
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
 
-import java.util.List;
+import java.util.Arrays;
+import java.util.StringJoiner;
 
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public abstract class Expression {
 
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+  public String join(String... substrings) {
+    StringJoiner joiner = new StringJoiner(SiddhiConstants.WHITESPACE);
+    Arrays.stream(substrings).forEach(joiner::add);
 
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+    return joiner.toString();
+  }
 
+  public abstract String toSiddhiEpl();
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
index d7adcb9..cf600e1 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query.expression;
 
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+public class Expressions {
 
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+  public static Expression property(String propertyName) {
+    //return new PropertyExpression(propertyName);
+    return null;
+  }
 
+  public static Expression property(String propertyName, String targetName) {
+    return null;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
index d7adcb9..ceade46 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
@@ -15,16 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
-  String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
-  String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+package org.apache.streampipes.wrapper.siddhi.query.expression;
 
+public abstract class PropertyExpression extends Expression {
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
index 2292f18..b5bc847 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
@@ -18,10 +18,12 @@
 package org.apache.streampipes.wrapper.siddhi.utils;
 
 import io.siddhi.core.event.Event;
+import io.siddhi.query.api.definition.Attribute;
 import org.apache.streampipes.model.runtime.EventFactory;
 import org.apache.streampipes.model.runtime.SchemaInfo;
 import org.apache.streampipes.model.runtime.SourceInfo;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
 
 import java.util.HashMap;
 import java.util.List;
@@ -29,18 +31,19 @@ import java.util.Map;
 
 public class SiddhiUtils {
 
-  public static org.apache.streampipes.model.runtime.Event toSpEvent(Event event, List<String> outputEventKeys, SchemaInfo
-          schemaInfo, SourceInfo sourceInfo) {
+  public static org.apache.streampipes.model.runtime.Event toSpEvent(Event event,
+                                                                     SchemaInfo schemaInfo,
+                                                                     SourceInfo sourceInfo,
+                                                                     List<Attribute> streamAttributes) {
     Map<String, Object> outMap = new HashMap<>();
 
-    for (int i = 0; i < outputEventKeys.size(); i++) {
-      if (event.getData(i) instanceof List) {
-        List<Object> tmp = (List<Object>) event.getData(i);
-        outMap.put(outputEventKeys.get(i), tmp.get(0));
-      }
-      else {
-        outMap.put(outputEventKeys.get(i), event.getData(i));
+    for (int i = 0; i < streamAttributes.size(); i++) {
+      String outputKey = streamAttributes.get(i).getName();
+      if (outputKey.startsWith(SiddhiConstants.FIRST_STREAM_PREFIX) ||
+              outputKey.startsWith(SiddhiConstants.SECOND_STREAM_PREFIX)) {
+        outputKey = outputKey.substring(2);
       }
+        outMap.put(outputKey, event.getData(i));
     }
     return EventFactory.fromMap(outMap, sourceInfo, schemaInfo);
   }
@@ -54,6 +57,10 @@ public class SiddhiUtils {
     return result;
   }
 
+  public static String getPreparedOutputTopicName(EventProcessorBindingParams params) {
+    return prepareName(getOutputTopicName(params));
+  }
+
   public static String getOutputTopicName(EventProcessorBindingParams parameters) {
     return parameters
             .getGraph()