You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/22 20:09:15 UTC

[streampipes] 03/07: add checkstyle to streampipes-wrapper-standalone

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 3e9ffeb974b2afaa4493ac7490e0fcc4d12945de
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:00:58 2022 +0100

    add checkstyle to streampipes-wrapper-standalone
---
 streampipes-wrapper-standalone/pom.xml             |  43 ++--
 .../AbstractConfiguredPipelineElement.java         |  14 +-
 .../standalone/ConfiguredEventProcessor.java       |   6 +-
 .../wrapper/standalone/ConfiguredEventSink.java    |   5 +-
 .../ConfiguredExternalEventProcessor.java          |   8 +-
 .../standalone/ConfiguredExternalEventSink.java    |   6 +-
 .../standalone/StreamPipesDataProcessor.java       |   6 +-
 .../wrapper/standalone/StreamPipesDataSink.java    |   3 +-
 .../StreamPipesExternalDataProcessor.java          | 222 +++++++++++----------
 .../StandaloneEventProcessingDeclarer.java         |  21 +-
 .../StandaloneEventProcessorDeclarerSingleton.java |  27 ++-
 .../declarer/StandaloneEventSinkDeclarer.java      |  20 +-
 .../StandaloneEventSinkDeclarerSingleton.java      |  20 +-
 .../StandaloneExternalEventProcessingDeclarer.java |  26 ++-
 .../StandaloneExternalEventSinkDeclarer.java       |  29 ++-
 .../standalone/function/StreamPipesFunction.java   |   3 +-
 .../standalone/manager/ProtocolManager.java        |  30 +--
 .../standalone/routing/StandaloneSpCollector.java  |  12 +-
 .../routing/StandaloneSpInputCollector.java        |  12 +-
 .../routing/StandaloneSpOutputCollector.java       |  11 +-
 .../runtime/StandaloneEventProcessorRuntime.java   |  11 +-
 .../runtime/StandaloneEventSinkRuntime.java        |  11 +-
 .../StandaloneExternalEventProcessorRuntime.java   |  10 +-
 .../StandaloneExternalEventSinkRuntime.java        |   9 +-
 .../runtime/StandalonePipelineElementRuntime.java  |  24 +--
 25 files changed, 329 insertions(+), 260 deletions(-)

diff --git a/streampipes-wrapper-standalone/pom.xml b/streampipes-wrapper-standalone/pom.xml
index df12f29d9..a873cb7eb 100644
--- a/streampipes-wrapper-standalone/pom.xml
+++ b/streampipes-wrapper-standalone/pom.xml
@@ -16,23 +16,32 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.streampipes</groupId>
-		<artifactId>streampipes-parent</artifactId>
-		<version>0.91.0-SNAPSHOT</version>
-	</parent>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampipes</groupId>
+        <artifactId>streampipes-parent</artifactId>
+        <version>0.91.0-SNAPSHOT</version>
+    </parent>
 
-	<artifactId>streampipes-wrapper-standalone</artifactId>
-	<name>StreamPipes Wrapper for Standalone Pipeline Element Implementations</name>
+    <artifactId>streampipes-wrapper-standalone</artifactId>
+    <name>StreamPipes Wrapper for Standalone Pipeline Element Implementations</name>
 
-	<dependencies>
-		<!-- StreamPipes dependencies -->
-		<dependency>
-			<groupId>org.apache.streampipes</groupId>
-			<artifactId>streampipes-wrapper</artifactId>
-			<version>0.91.0-SNAPSHOT</version>
-		</dependency>
-	</dependencies>
+    <dependencies>
+        <!-- StreamPipes dependencies -->
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-wrapper</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
index 0d1a189b8..01c42ba73 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
@@ -23,20 +23,20 @@ import org.apache.streampipes.wrapper.runtime.PipelineElement;
 
 import java.util.function.Supplier;
 
-public class AbstractConfiguredPipelineElement<I extends InvocableStreamPipesEntity,
-        B extends BindingParams<I>,
-        T extends PipelineElement<B, I>> {
+public class AbstractConfiguredPipelineElement<K extends InvocableStreamPipesEntity,
+    V extends BindingParams<K>,
+    T extends PipelineElement<V, K>> {
 
-  private B bindingParams;
+  private V bindingParams;
   private Supplier<T> engineSupplier;
 
-  public AbstractConfiguredPipelineElement(B bindingParams,
-                                          Supplier<T> engineSupplier) {
+  public AbstractConfiguredPipelineElement(V bindingParams,
+                                           Supplier<T> engineSupplier) {
     this.bindingParams = bindingParams;
     this.engineSupplier = engineSupplier;
   }
 
-  public B getBindingParams() {
+  public V getBindingParams() {
     return bindingParams;
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
index 551956e76..d4e77a9b2 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
@@ -24,10 +24,10 @@ import org.apache.streampipes.wrapper.runtime.EventProcessor;
 
 import java.util.function.Supplier;
 
-public class ConfiguredEventProcessor<B extends EventProcessorBindingParams>
-        extends AbstractConfiguredPipelineElement<DataProcessorInvocation, B, EventProcessor<B>> {
+public class ConfiguredEventProcessor<T extends EventProcessorBindingParams>
+    extends AbstractConfiguredPipelineElement<DataProcessorInvocation, T, EventProcessor<T>> {
 
-  public ConfiguredEventProcessor(B bindingParams, Supplier<EventProcessor<B>> engineSupplier) {
+  public ConfiguredEventProcessor(T bindingParams, Supplier<EventProcessor<T>> engineSupplier) {
     super(bindingParams, engineSupplier);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
index fbc8b05d9..f6a8e2b49 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
@@ -24,9 +24,10 @@ import org.apache.streampipes.wrapper.runtime.EventSink;
 
 import java.util.function.Supplier;
 
-public class ConfiguredEventSink<B extends EventSinkBindingParams> extends AbstractConfiguredPipelineElement<DataSinkInvocation, B, EventSink<B>> {
+public class ConfiguredEventSink<T extends EventSinkBindingParams>
+    extends AbstractConfiguredPipelineElement<DataSinkInvocation, T, EventSink<T>> {
 
-  public ConfiguredEventSink(B bindingParams, Supplier<EventSink<B>> engineSupplier) {
+  public ConfiguredEventSink(T bindingParams, Supplier<EventSink<T>> engineSupplier) {
     super(bindingParams, engineSupplier);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
index 147b11fcb..67fe96048 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
@@ -23,11 +23,11 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
 
 import java.util.function.Supplier;
 
-public class ConfiguredExternalEventProcessor<B extends EventProcessorBindingParams>
-        extends AbstractConfiguredPipelineElement<DataProcessorInvocation, B,
-        ExternalEventProcessor<B>> {
+public class ConfiguredExternalEventProcessor<T extends EventProcessorBindingParams>
+    extends AbstractConfiguredPipelineElement<DataProcessorInvocation, T,
+    ExternalEventProcessor<T>> {
 
-  public ConfiguredExternalEventProcessor(B bindingParams, Supplier<ExternalEventProcessor<B>> engineSupplier) {
+  public ConfiguredExternalEventProcessor(T bindingParams, Supplier<ExternalEventProcessor<T>> engineSupplier) {
     super(bindingParams, engineSupplier);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
index 38418fba3..9abf7b413 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
@@ -23,10 +23,10 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventSink;
 
 import java.util.function.Supplier;
 
-public class ConfiguredExternalEventSink<B extends EventSinkBindingParams>
-        extends AbstractConfiguredPipelineElement<DataSinkInvocation, B, ExternalEventSink<B>> {
+public class ConfiguredExternalEventSink<T extends EventSinkBindingParams>
+    extends AbstractConfiguredPipelineElement<DataSinkInvocation, T, ExternalEventSink<T>> {
 
-  public ConfiguredExternalEventSink(B bindingParams, Supplier<ExternalEventSink<B>> engineSupplier) {
+  public ConfiguredExternalEventSink(T bindingParams, Supplier<ExternalEventSink<T>> engineSupplier) {
     super(bindingParams, engineSupplier);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
index f5dde0257..f938e413d 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
@@ -24,10 +24,12 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
 
 import java.util.function.Supplier;
 
-public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams> implements EventProcessor<ProcessorParams> {
+public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams>
+    implements EventProcessor<ProcessorParams> {
 
   @Override
-  public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+  public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
+                                                                ProcessingElementParameterExtractor extractor) {
     Supplier<EventProcessor<ProcessorParams>> supplier = () -> this;
     return new ConfiguredEventProcessor<>(new ProcessorParams(graph), supplier);
   }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
index b087b0b2b..5fa3dc4af 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
@@ -24,7 +24,8 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDec
 
 import java.util.function.Supplier;
 
-public abstract class StreamPipesDataSink extends StandaloneEventSinkDeclarer<SinkParams> implements EventSink<SinkParams> {
+public abstract class StreamPipesDataSink extends StandaloneEventSinkDeclarer<SinkParams>
+    implements EventSink<SinkParams> {
 
   @Override
   public ConfiguredEventSink<SinkParams> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
index 1992905c3..3b29d8a62 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
@@ -17,9 +17,6 @@
  */
 package org.apache.streampipes.wrapper.standalone;
 
-import com.google.gson.JsonObject;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -27,119 +24,124 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
 import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
 
+import com.google.gson.JsonObject;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+
 import java.io.IOException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.function.Supplier;
 
-public abstract class StreamPipesExternalDataProcessor extends StandaloneExternalEventProcessingDeclarer<ProcessorParams>
-        implements ExternalEventProcessor<ProcessorParams> {
-
-    // endpoint of Python processor runs here
-    private static final String HTTP_PROTOCOL = "http://";
-    private static final String PYTHON_ENDPOINT = "localhost:5000";
-
-    private String invocationId;
-    private String appId;
-    private String inputTopic;
-    private String outputTopic;
-    private String kafkaUrl;
-
-    @Override
-    public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
-                                                                          ProcessingElementParameterExtractor extractor) {
-        EventProcessorBindingParams params = new ProcessorParams(graph);
-        invocationId = UUID.randomUUID().toString();
-        appId = graph.getAppId();
-        inputTopic = getInputTopic(params);
-        outputTopic = getOutputTopic(params);
-        kafkaUrl = getKafkaUrl(params);
-
-        Supplier<ExternalEventProcessor<ProcessorParams>> supplier = () -> this;
-        return new ConfiguredExternalEventProcessor<>(new ProcessorParams(graph), supplier);
-    }
-
-    protected JsonObject createMinimalInvocationGraph(Map<String, String> staticPropertyMap) {
-        JsonObject json = new JsonObject();
-
-        json.addProperty("invocation_id", invocationId);
-        json.addProperty("processor_id", appId);
-        json.addProperty("input_topics", inputTopic);
-        json.addProperty("output_topics", outputTopic);
-        json.addProperty("bootstrap_servers", kafkaUrl);
-
-        JsonObject staticProperties = new JsonObject();
-        staticPropertyMap.forEach(staticProperties::addProperty);
-        json.add("static_properties", staticProperties);
-
-        return json;
-    }
-
-    protected void invoke(JsonObject json) {
-        post("invoke", json.toString());
-    }
-
-    protected void detach () {
-        JsonObject json = new JsonObject();
-        json.addProperty("invocation_id", invocationId);
-        post("detach", json.toString());
-    }
-
-    private static String post(String endpoint, String payload) {
-        String responseString = null;
-
-        try {
-            responseString = Request.Post(HTTP_PROTOCOL + PYTHON_ENDPOINT + "/" + endpoint)
-                    .bodyString(payload, ContentType.APPLICATION_JSON)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
-                    .execute().returnContent().asString();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return responseString;
-    }
-
-    private String getInputTopic(EventProcessorBindingParams parameters) {
-        return parameters
-                .getGraph()
-                .getInputStreams()
-                .get(0)
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName();
-    }
-
-    private String getOutputTopic(EventProcessorBindingParams parameters) {
-        return parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName();
-    }
-
-    private String getKafkaUrl(EventProcessorBindingParams parameters) {
-        String brokerHostname = parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocols()
-                .get(0)
-                .getBrokerHostname();
-
-        Integer kafkaPort = ((KafkaTransportProtocol) parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocols()
-                .get(0))
-                .getKafkaPort();
-
-        return brokerHostname + ":" + kafkaPort.toString();
+public abstract class StreamPipesExternalDataProcessor
+    extends StandaloneExternalEventProcessingDeclarer<ProcessorParams>
+    implements ExternalEventProcessor<ProcessorParams> {
+
+  // endpoint of Python processor runs here
+  private static final String HTTP_PROTOCOL = "http://";
+  private static final String PYTHON_ENDPOINT = "localhost:5000";
+
+  private String invocationId;
+  private String appId;
+  private String inputTopic;
+  private String outputTopic;
+  private String kafkaUrl;
+
+  private static String post(String endpoint, String payload) {
+    String responseString = null;
+
+    try {
+      responseString = Request.Post(HTTP_PROTOCOL + PYTHON_ENDPOINT + "/" + endpoint)
+          .bodyString(payload, ContentType.APPLICATION_JSON)
+          .connectTimeout(1000)
+          .socketTimeout(100000)
+          .execute().returnContent().asString();
+    } catch (IOException e) {
+      e.printStackTrace();
     }
+    return responseString;
+  }
+
+  @Override
+  public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
+                                                                        ProcessingElementParameterExtractor extractor) {
+    EventProcessorBindingParams params = new ProcessorParams(graph);
+    invocationId = UUID.randomUUID().toString();
+    appId = graph.getAppId();
+    inputTopic = getInputTopic(params);
+    outputTopic = getOutputTopic(params);
+    kafkaUrl = getKafkaUrl(params);
+
+    Supplier<ExternalEventProcessor<ProcessorParams>> supplier = () -> this;
+    return new ConfiguredExternalEventProcessor<>(new ProcessorParams(graph), supplier);
+  }
+
+  protected JsonObject createMinimalInvocationGraph(Map<String, String> staticPropertyMap) {
+    JsonObject json = new JsonObject();
+
+    json.addProperty("invocation_id", invocationId);
+    json.addProperty("processor_id", appId);
+    json.addProperty("input_topics", inputTopic);
+    json.addProperty("output_topics", outputTopic);
+    json.addProperty("bootstrap_servers", kafkaUrl);
+
+    JsonObject staticProperties = new JsonObject();
+    staticPropertyMap.forEach(staticProperties::addProperty);
+    json.add("static_properties", staticProperties);
+
+    return json;
+  }
+
+  protected void invoke(JsonObject json) {
+    post("invoke", json.toString());
+  }
+
+  protected void detach() {
+    JsonObject json = new JsonObject();
+    json.addProperty("invocation_id", invocationId);
+    post("detach", json.toString());
+  }
+
+  private String getInputTopic(EventProcessorBindingParams parameters) {
+    return parameters
+        .getGraph()
+        .getInputStreams()
+        .get(0)
+        .getEventGrounding()
+        .getTransportProtocol()
+        .getTopicDefinition()
+        .getActualTopicName();
+  }
+
+  private String getOutputTopic(EventProcessorBindingParams parameters) {
+    return parameters
+        .getGraph()
+        .getOutputStream()
+        .getEventGrounding()
+        .getTransportProtocol()
+        .getTopicDefinition()
+        .getActualTopicName();
+  }
+
+  private String getKafkaUrl(EventProcessorBindingParams parameters) {
+    String brokerHostname = parameters
+        .getGraph()
+        .getOutputStream()
+        .getEventGrounding()
+        .getTransportProtocols()
+        .get(0)
+        .getBrokerHostname();
+
+    Integer kafkaPort = ((KafkaTransportProtocol) parameters
+        .getGraph()
+        .getOutputStream()
+        .getEventGrounding()
+        .getTransportProtocols()
+        .get(0))
+        .getKafkaPort();
+
+    return brokerHostname + ":" + kafkaPort.toString();
+  }
 
 }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
index 7bbc6d84c..d52cde04c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
@@ -29,22 +29,27 @@ import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
 
 @Deprecated(since = "0.70.0", forRemoval = true)
-public abstract class StandaloneEventProcessingDeclarer<B extends
-        EventProcessorBindingParams> extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
+public abstract class StandaloneEventProcessingDeclarer<T extends
+    EventProcessorBindingParams> extends EventProcessorDeclarer<T, StandaloneEventProcessorRuntime<T>> {
 
-  public abstract ConfiguredEventProcessor<B> onInvocation(DataProcessorInvocation graph,
+  public abstract ConfiguredEventProcessor<T> onInvocation(DataProcessorInvocation graph,
                                                            ProcessingElementParameterExtractor extractor);
 
   @Override
-  public StandaloneEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+  public StandaloneEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
                                                        ProcessingElementParameterExtractor extractor,
                                                        ConfigExtractor configExtractor,
                                                        StreamPipesClient streamPipesClient) {
-    ConfiguredEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
-    EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
-            (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+    ConfiguredEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+    EventProcessorRuntimeParams<T> runtimeParams =
+        new EventProcessorRuntimeParams<> (
+            configuredEngine.getBindingParams(),
+            false,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
-            runtimeParams);
+        runtimeParams);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
index 2fcf23f40..74010a8a9 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
@@ -28,22 +28,33 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
 
-public abstract class StandaloneEventProcessorDeclarerSingleton<B extends EventProcessorBindingParams>
-        extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
+@Deprecated(since = "0.90.0", forRemoval = true)
+/**
+ * @deprecated: since there is no usage
+ */
+public abstract class StandaloneEventProcessorDeclarerSingleton<T extends EventProcessorBindingParams>
+    extends EventProcessorDeclarer<T, StandaloneEventProcessorRuntime<T>> {
 
   @Override
-  public StandaloneEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+  public StandaloneEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
                                                        ProcessingElementParameterExtractor extractor,
                                                        ConfigExtractor configExtractor,
                                                        StreamPipesClient streamPipesClient) {
 
-    ConfiguredEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
-    EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
-            (configuredEngine.getBindingParams(), true, configExtractor, streamPipesClient);
+    ConfiguredEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+    EventProcessorRuntimeParams<T> runtimeParams =
+        new EventProcessorRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            true,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
-            runtimeParams);
+        runtimeParams);
   }
 
-  public abstract ConfiguredEventProcessor<B> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor);
+  public abstract ConfiguredEventProcessor<T> onInvocation(DataProcessorInvocation graph,
+                                                           ProcessingElementParameterExtractor extractor);
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
index 048b857cc..115b21199 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
@@ -28,22 +28,28 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventSinkRuntime;
 
-public abstract class StandaloneEventSinkDeclarer<B extends
-        EventSinkBindingParams> extends EventSinkDeclarer<B, StandaloneEventSinkRuntime<B>> {
+public abstract class StandaloneEventSinkDeclarer<T extends
+    EventSinkBindingParams> extends EventSinkDeclarer<T, StandaloneEventSinkRuntime<T>> {
 
   @Override
-  public StandaloneEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+  public StandaloneEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
                                                   DataSinkParameterExtractor extractor,
                                                   ConfigExtractor configExtractor,
                                                   StreamPipesClient streamPipesClient) {
 
-    ConfiguredEventSink<B> configuredEngine = onInvocation(graph, extractor);
-    EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
-            (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+    ConfiguredEventSink<T> configuredEngine = onInvocation(graph, extractor);
+    EventSinkRuntimeParams<T> runtimeParams =
+        new EventSinkRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            false,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneEventSinkRuntime<>(configuredEngine.getEngineSupplier(), runtimeParams);
   }
 
-  public abstract ConfiguredEventSink<B> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
+  public abstract ConfiguredEventSink<T> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
 
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
index aa2a27b02..88d948b0f 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
@@ -28,21 +28,27 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventSinkRuntime;
 
-public abstract class StandaloneEventSinkDeclarerSingleton<B extends
-        EventSinkBindingParams> extends EventSinkDeclarer<B, StandaloneEventSinkRuntime<B>> {
+public abstract class StandaloneEventSinkDeclarerSingleton<T extends
+    EventSinkBindingParams> extends EventSinkDeclarer<T, StandaloneEventSinkRuntime<T>> {
 
   @Override
-  public StandaloneEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+  public StandaloneEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
                                                   DataSinkParameterExtractor extractor,
                                                   ConfigExtractor configExtractor,
                                                   StreamPipesClient streamPipesClient) {
 
-    ConfiguredEventSink<B> configuredEngine = onInvocation(graph, extractor);
-    EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
-            (configuredEngine.getBindingParams(), true, configExtractor, streamPipesClient);
+    ConfiguredEventSink<T> configuredEngine = onInvocation(graph, extractor);
+    EventSinkRuntimeParams<T> runtimeParams =
+        new EventSinkRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            true,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneEventSinkRuntime<>(configuredEngine.getEngineSupplier(), runtimeParams);
   }
 
-  public abstract ConfiguredEventSink<B> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
+  public abstract ConfiguredEventSink<T> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
index 8e020cf4e..efa85bf6e 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
@@ -27,23 +27,29 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
 import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneExternalEventProcessorRuntime;
 
-public abstract class StandaloneExternalEventProcessingDeclarer<B extends
-        EventProcessorBindingParams> extends EventProcessorDeclarer<B,
-        StandaloneExternalEventProcessorRuntime<B>> {
+public abstract class StandaloneExternalEventProcessingDeclarer<T extends
+    EventProcessorBindingParams> extends EventProcessorDeclarer<T,
+    StandaloneExternalEventProcessorRuntime<T>> {
 
-  public abstract ConfiguredExternalEventProcessor<B> onInvocation(DataProcessorInvocation graph,
-                                                            ProcessingElementParameterExtractor extractor);
+  public abstract ConfiguredExternalEventProcessor<T> onInvocation(DataProcessorInvocation graph,
+                                                                   ProcessingElementParameterExtractor extractor);
 
   @Override
-  public StandaloneExternalEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+  public StandaloneExternalEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
                                                                ProcessingElementParameterExtractor extractor,
                                                                ConfigExtractor configExtractor,
                                                                StreamPipesClient streamPipesClient) {
-    ConfiguredExternalEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
-    EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
-            (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+    ConfiguredExternalEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+    EventProcessorRuntimeParams<T> runtimeParams =
+        new EventProcessorRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            false,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneExternalEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
-            runtimeParams);
+        runtimeParams);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
index 2815c08ec..3464955c8 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
@@ -27,24 +27,35 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventSink;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneExternalEventSinkRuntime;
 
-public abstract class StandaloneExternalEventSinkDeclarer<B extends
-        EventSinkBindingParams> extends EventSinkDeclarer<B,
-                StandaloneExternalEventSinkRuntime<B>> {
+/**
+ * @deprecated since there is no usage
+ * @param <T>
+ */
+@Deprecated(since = "0.90.0", forRemoval = true)
+public abstract class StandaloneExternalEventSinkDeclarer<T extends
+    EventSinkBindingParams> extends EventSinkDeclarer<T,
+    StandaloneExternalEventSinkRuntime<T>> {
 
   @Override
-  public StandaloneExternalEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+  public StandaloneExternalEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
                                                           DataSinkParameterExtractor extractor,
                                                           ConfigExtractor configExtractor,
                                                           StreamPipesClient streamPipesClient) {
 
-    ConfiguredExternalEventSink<B> configuredEngine = onInvocation(graph, extractor);
-    EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
-            (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+    ConfiguredExternalEventSink<T> configuredEngine = onInvocation(graph, extractor);
+    EventSinkRuntimeParams<T> runtimeParams =
+        new EventSinkRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            false,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneExternalEventSinkRuntime<>(configuredEngine.getEngineSupplier(),
-            runtimeParams);
+        runtimeParams);
   }
 
-  public abstract ConfiguredExternalEventSink<B> onInvocation(DataSinkInvocation graph,
+  public abstract ConfiguredExternalEventSink<T> onInvocation(DataSinkInvocation graph,
                                                               DataSinkParameterExtractor extractor);
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index d9e9aaa50..9373aed89 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -35,6 +35,7 @@ import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.wrapper.routing.RawDataProcessor;
 import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,9 +48,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclarer, RawDataProcessor {
 
   private static final Logger LOG = LoggerFactory.getLogger(StreamPipesFunction.class);
-  private Map<String, SpInputCollector> inputCollectors;
   private final Map<String, SourceInfo> sourceInfoMapper;
   private final Map<String, SchemaInfo> schemaInfoMapper;
+  private Map<String, SpInputCollector> inputCollectors;
 
   public StreamPipesFunction() {
     this.sourceInfoMapper = new HashMap<>();
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
index 299be5780..19c9b1b79 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
@@ -18,31 +18,32 @@
 
 package org.apache.streampipes.wrapper.standalone.manager;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.grounding.TransportFormat;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.wrapper.standalone.routing.StandaloneSpInputCollector;
 import org.apache.streampipes.wrapper.standalone.routing.StandaloneSpOutputCollector;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.HashMap;
 import java.util.Map;
 
 public class ProtocolManager {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ProtocolManager.class);
   public static Map<String, StandaloneSpInputCollector> consumers = new HashMap<>();
   public static Map<String, StandaloneSpOutputCollector> producers = new HashMap<>();
 
-  private static final Logger LOG = LoggerFactory.getLogger(ProtocolManager.class);
-
   // TODO currently only the topic name is used as an identifier for a consumer/producer. Should
   // be changed by some hashCode implementation in streampipes-model, but this requires changes
   // in empire serializers
 
   public static <T extends TransportProtocol> StandaloneSpInputCollector findInputCollector(T protocol,
                                                                                             TransportFormat format,
-                                                                                            Boolean singletonEngine) throws SpRuntimeException {
+                                                                                            Boolean singletonEngine)
+      throws SpRuntimeException {
 
     if (consumers.containsKey(topicName(protocol))) {
       return consumers.get(topicName(protocol));
@@ -56,14 +57,15 @@ public class ProtocolManager {
 
   public static <T extends TransportProtocol> StandaloneSpOutputCollector findOutputCollector(T protocol,
                                                                                               TransportFormat format,
-                                                                                              String resourceId) throws SpRuntimeException {
+                                                                                              String resourceId)
+      throws SpRuntimeException {
 
     if (producers.containsKey(topicName(protocol))) {
       return producers.get(topicName(protocol));
     } else {
       producers.put(topicName(protocol), makeOutputCollector(protocol, format, resourceId));
       LOG.info("Adding new producer to producer map (size=" + producers.size() + "): " + topicName
-              (protocol));
+          (protocol));
       return producers.get(topicName(protocol));
     }
 
@@ -71,13 +73,15 @@ public class ProtocolManager {
 
   private static <T extends TransportProtocol> StandaloneSpInputCollector<T> makeInputCollector(T protocol,
                                                                                                 TransportFormat format,
-                                                                                                Boolean singletonEngine) throws SpRuntimeException {
+                                                                                                Boolean singletonEngine)
+      throws SpRuntimeException {
     return new StandaloneSpInputCollector<>(protocol, format, singletonEngine);
   }
 
   public static <T extends TransportProtocol> StandaloneSpOutputCollector<T> makeOutputCollector(T protocol,
                                                                                                  TransportFormat format,
-                                                                                                 String resourceId) throws SpRuntimeException {
+                                                                                                 String resourceId)
+      throws SpRuntimeException {
     return new StandaloneSpOutputCollector<>(protocol, format, resourceId);
   }
 
@@ -86,17 +90,17 @@ public class ProtocolManager {
   }
 
   public static <T extends TransportProtocol> void removeInputCollector(T protocol) throws
-          SpRuntimeException {
+      SpRuntimeException {
     consumers.remove(topicName(protocol));
     LOG.info("Removing consumer from consumer map (size=" + consumers.size() + "): " + topicName
-            (protocol));
+        (protocol));
   }
 
   public static <T extends TransportProtocol> void removeOutputCollector(T protocol) throws
-          SpRuntimeException {
+      SpRuntimeException {
     producers.remove(topicName(protocol));
     LOG.info("Removing producer from producer map (size=" + producers.size() + "): " + topicName
-            (protocol));
+        (protocol));
   }
 
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
index cc7c73818..35dc44304 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
@@ -29,10 +29,10 @@ import org.apache.streampipes.wrapper.standalone.manager.PManager;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public abstract class StandaloneSpCollector<T extends TransportProtocol, C> implements
-        PipelineElementCollector<C> {
+public abstract class StandaloneSpCollector<T extends TransportProtocol, W> implements
+    PipelineElementCollector<W> {
 
-  protected Map<String, C> consumers;
+  protected Map<String, W> consumers;
 
   protected T transportProtocol;
   protected SpProtocolDefinition<T> protocolDefinition;
@@ -45,15 +45,15 @@ public abstract class StandaloneSpCollector<T extends TransportProtocol, C> impl
   public StandaloneSpCollector(T protocol, TransportFormat format) throws SpRuntimeException {
     this.transportProtocol = protocol;
     this.protocolDefinition = PManager.getProtocolDefinition(protocol).orElseThrow(() -> new
-            SpRuntimeException("Could not find protocol"));
+        SpRuntimeException("Could not find protocol"));
     this.transportFormat = format;
     this.dataFormatDefinition = PManager.getDataFormat(format).orElseThrow(() -> new
-            SpRuntimeException("Could not find format"));
+        SpRuntimeException("Could not find format"));
     this.consumers = new ConcurrentHashMap<>();
     this.topic = transportProtocol.getTopicDefinition().getActualTopicName();
   }
 
-  public void registerConsumer(String routeId, C consumer) {
+  public void registerConsumer(String routeId, W consumer) {
     consumers.put(routeId, consumer);
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
index 77277c6c1..eca3b78bd 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
@@ -27,9 +27,9 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
 
 public class StandaloneSpInputCollector<T extends TransportProtocol> extends
-        StandaloneSpCollector<T, RawDataProcessor>
-        implements
-        InternalEventProcessor<byte[]>, SpInputCollector {
+    StandaloneSpCollector<T, RawDataProcessor>
+    implements
+    InternalEventProcessor<byte[]>, SpInputCollector {
 
   private final Boolean singletonEngine;
 
@@ -42,20 +42,20 @@ public class StandaloneSpInputCollector<T extends TransportProtocol> extends
   @Override
   public void onEvent(byte[] event) {
     if (singletonEngine) {
-     send(consumers.get(consumers.keySet().toArray()[0]), event);
+      send(consumers.get(consumers.keySet().toArray()[0]), event);
     } else {
       consumers.forEach((key, value) -> send(value, event));
     }
   }
 
   private void send(RawDataProcessor rawDataProcessor, byte[] event) {
-      rawDataProcessor.process(dataFormatDefinition.toMap(event), topic);
+    rawDataProcessor.process(dataFormatDefinition.toMap(event), topic);
   }
 
   @Override
   public void connect() throws SpRuntimeException {
     if (!protocolDefinition.getConsumer().isConnected()) {
-      protocolDefinition.getConsumer().connect(transportProtocol,this);
+      protocolDefinition.getConsumer().connect(transportProtocol, this);
     }
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
index d658e58a3..c6a55a3cc 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
@@ -30,14 +30,15 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.EventConverter;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 public class StandaloneSpOutputCollector<T extends TransportProtocol> extends
-        StandaloneSpCollector<T, InternalEventProcessor<Map<String,
-                Object>>> implements SpOutputCollector {
+    StandaloneSpCollector<T, InternalEventProcessor<Map<String,
+        Object>>> implements SpOutputCollector {
 
   private static final Logger LOG = LoggerFactory.getLogger(StandaloneSpOutputCollector.class);
 
@@ -47,9 +48,9 @@ public class StandaloneSpOutputCollector<T extends TransportProtocol> extends
   public StandaloneSpOutputCollector(T protocol,
                                      TransportFormat format,
                                      String resourceId) throws SpRuntimeException {
-   super(protocol, format);
-   this.producer = protocolDefinition.getProducer();
-   this.resourceId = resourceId;
+    super(protocol, format);
+    this.producer = protocolDefinition.getProducer();
+    this.resourceId = resourceId;
   }
 
   public void collect(Event event) {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index 60763cdc5..4d0a686d1 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -27,22 +27,23 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.function.Supplier;
 
-public class StandaloneEventProcessorRuntime<B extends EventProcessorBindingParams> extends
-    StandalonePipelineElementRuntime<B, DataProcessorInvocation,
-        EventProcessorRuntimeParams<B>, EventProcessorRuntimeContext, EventProcessor<B>> {
+public class StandaloneEventProcessorRuntime<T extends EventProcessorBindingParams> extends
+    StandalonePipelineElementRuntime<T, DataProcessorInvocation,
+        EventProcessorRuntimeParams<T>, EventProcessorRuntimeContext, EventProcessor<T>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(StandaloneEventProcessorRuntime.class);
 
   protected SpOutputCollector outputCollector;
 
-  public StandaloneEventProcessorRuntime(Supplier<EventProcessor<B>> supplier,
-                                         EventProcessorRuntimeParams<B> params) {
+  public StandaloneEventProcessorRuntime(Supplier<EventProcessor<T>> supplier,
+                                         EventProcessorRuntimeParams<T> params) {
     super(supplier, params);
     this.outputCollector = getOutputCollector();
   }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
index d706a2397..5d5156b2a 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
@@ -25,19 +25,20 @@ import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.runtime.EventSink;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.function.Supplier;
 
-public class StandaloneEventSinkRuntime<B extends EventSinkBindingParams> extends
-    StandalonePipelineElementRuntime<B, DataSinkInvocation,
-        EventSinkRuntimeParams<B>, EventSinkRuntimeContext, EventSink<B>> {
+public class StandaloneEventSinkRuntime<T extends EventSinkBindingParams> extends
+    StandalonePipelineElementRuntime<T, DataSinkInvocation,
+        EventSinkRuntimeParams<T>, EventSinkRuntimeContext, EventSink<T>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(StandaloneEventSinkRuntime.class);
 
-  public StandaloneEventSinkRuntime(Supplier<EventSink<B>> supplier, EventSinkRuntimeParams<B>
+  public StandaloneEventSinkRuntime(Supplier<EventSink<T>> supplier, EventSinkRuntimeParams<T>
       params) {
     super(supplier, params);
   }
@@ -55,7 +56,7 @@ public class StandaloneEventSinkRuntime<B extends EventSinkBindingParams> extend
       monitoringManager.increaseInCounter(resourceId, sourceInfo, System.currentTimeMillis());
       engine.onEvent(params.makeEvent(rawEvent, sourceInfo));
     } catch (RuntimeException e) {
-      LOG.error("RuntimeException while processing event in {}", engine.getClass().getCanonicalName() , e);
+      LOG.error("RuntimeException while processing event in {}", engine.getClass().getCanonicalName(), e);
       addLogEntry(e);
     }
   }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
index 01c01878d..5e0dd840f 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
@@ -27,12 +27,12 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
 import java.util.Map;
 import java.util.function.Supplier;
 
-public class StandaloneExternalEventProcessorRuntime<B extends EventProcessorBindingParams> extends
-        StandalonePipelineElementRuntime<B, DataProcessorInvocation,
-                EventProcessorRuntimeParams<B>, EventProcessorRuntimeContext, ExternalEventProcessor<B>> {
+public class StandaloneExternalEventProcessorRuntime<T extends EventProcessorBindingParams> extends
+    StandalonePipelineElementRuntime<T, DataProcessorInvocation,
+        EventProcessorRuntimeParams<T>, EventProcessorRuntimeContext, ExternalEventProcessor<T>> {
 
-  public StandaloneExternalEventProcessorRuntime(Supplier<ExternalEventProcessor<B>> supplier,
-                                                 EventProcessorRuntimeParams<B> runtimeParams) {
+  public StandaloneExternalEventProcessorRuntime(Supplier<ExternalEventProcessor<T>> supplier,
+                                                 EventProcessorRuntimeParams<T> runtimeParams) {
     super(supplier, runtimeParams);
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
index 418702da1..de5f4c881 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
@@ -27,11 +27,12 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventSink;
 import java.util.Map;
 import java.util.function.Supplier;
 
-public class StandaloneExternalEventSinkRuntime<B extends EventSinkBindingParams> extends
-        StandalonePipelineElementRuntime<B, DataSinkInvocation,
-                EventSinkRuntimeParams<B>, EventSinkRuntimeContext, ExternalEventSink<B>> {
+public class StandaloneExternalEventSinkRuntime<T extends EventSinkBindingParams> extends
+    StandalonePipelineElementRuntime<T, DataSinkInvocation,
+        EventSinkRuntimeParams<T>, EventSinkRuntimeContext, ExternalEventSink<T>> {
 
-  public StandaloneExternalEventSinkRuntime(Supplier<ExternalEventSink<B>> supplier, EventSinkRuntimeParams<B> runtimeParams) {
+  public StandaloneExternalEventSinkRuntime(Supplier<ExternalEventSink<T>> supplier,
+                                            EventSinkRuntimeParams<T> runtimeParams) {
     super(supplier, runtimeParams);
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
index 4a046f33e..eab751d27 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
@@ -36,19 +36,19 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Supplier;
 
-public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I>,
-        I extends InvocableStreamPipesEntity,
-        RP extends RuntimeParams<B, I, RC>,
-        RC extends RuntimeContext,
-        P extends PipelineElement<B, I>>
-        extends PipelineElementRuntime implements RawDataProcessor {
+public abstract class StandalonePipelineElementRuntime<T extends BindingParams<K>,
+    K extends InvocableStreamPipesEntity,
+    V extends RuntimeParams<T, K, X>,
+    X extends RuntimeContext,
+    PeT extends PipelineElement<T, K>>
+    extends PipelineElementRuntime implements RawDataProcessor {
 
-  protected RP params;
-  protected final P engine;
+  protected final PeT engine;
+  protected V params;
   protected SpMonitoringManager monitoringManager;
   protected String resourceId;
 
-  public StandalonePipelineElementRuntime(Supplier<P> supplier, RP runtimeParams) {
+  public StandalonePipelineElementRuntime(Supplier<PeT> supplier, V runtimeParams) {
     super();
     this.engine = supplier.get();
     this.params = runtimeParams;
@@ -56,7 +56,7 @@ public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I
     this.resourceId = params.getBindingParams().getGraph().getElementId();
   }
 
-  public P getEngine() {
+  public PeT getEngine() {
     return engine;
   }
 
@@ -69,8 +69,8 @@ public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I
     List<SpInputCollector> inputCollectors = new ArrayList<>();
     for (SpDataStream is : params.getBindingParams().getGraph().getInputStreams()) {
       inputCollectors.add(ProtocolManager.findInputCollector(is.getEventGrounding()
-                      .getTransportProtocol(), is.getEventGrounding().getTransportFormats().get(0),
-              params.isSingletonEngine()));
+              .getTransportProtocol(), is.getEventGrounding().getTransportFormats().get(0),
+          params.isSingletonEngine()));
     }
     return inputCollectors;
   }