You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/22 20:09:12 UTC

[streampipes] branch chore/add-checkstyle created (now 94e59eb10)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a change to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git


      at 94e59eb10 add checkstyle to streampipes-wrapper-python

This branch includes the following new commits:

     new 94c3f2dff remove suppression for vocabulary
     new 0ae87fd29 add checkstyle to streampipes-wrapper
     new 3e9ffeb97 add checkstyle to streampipes-wrapper-standalone
     new 0bcd4f06b add checkstyle to streampipes-wrapper-distributed
     new 1e73b24fd add checkstyle to streampipes-wrapper-flink
     new 17622e6af add checkstyle to streampipes-wrapper-kafka-streams
     new 94e59eb10 add checkstyle to streampipes-wrapper-python

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[streampipes] 03/07: add checkstyle to streampipes-wrapper-standalone

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 3e9ffeb974b2afaa4493ac7490e0fcc4d12945de
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:00:58 2022 +0100

    add checkstyle to streampipes-wrapper-standalone
---
 streampipes-wrapper-standalone/pom.xml             |  43 ++--
 .../AbstractConfiguredPipelineElement.java         |  14 +-
 .../standalone/ConfiguredEventProcessor.java       |   6 +-
 .../wrapper/standalone/ConfiguredEventSink.java    |   5 +-
 .../ConfiguredExternalEventProcessor.java          |   8 +-
 .../standalone/ConfiguredExternalEventSink.java    |   6 +-
 .../standalone/StreamPipesDataProcessor.java       |   6 +-
 .../wrapper/standalone/StreamPipesDataSink.java    |   3 +-
 .../StreamPipesExternalDataProcessor.java          | 222 +++++++++++----------
 .../StandaloneEventProcessingDeclarer.java         |  21 +-
 .../StandaloneEventProcessorDeclarerSingleton.java |  27 ++-
 .../declarer/StandaloneEventSinkDeclarer.java      |  20 +-
 .../StandaloneEventSinkDeclarerSingleton.java      |  20 +-
 .../StandaloneExternalEventProcessingDeclarer.java |  26 ++-
 .../StandaloneExternalEventSinkDeclarer.java       |  29 ++-
 .../standalone/function/StreamPipesFunction.java   |   3 +-
 .../standalone/manager/ProtocolManager.java        |  30 +--
 .../standalone/routing/StandaloneSpCollector.java  |  12 +-
 .../routing/StandaloneSpInputCollector.java        |  12 +-
 .../routing/StandaloneSpOutputCollector.java       |  11 +-
 .../runtime/StandaloneEventProcessorRuntime.java   |  11 +-
 .../runtime/StandaloneEventSinkRuntime.java        |  11 +-
 .../StandaloneExternalEventProcessorRuntime.java   |  10 +-
 .../StandaloneExternalEventSinkRuntime.java        |   9 +-
 .../runtime/StandalonePipelineElementRuntime.java  |  24 +--
 25 files changed, 329 insertions(+), 260 deletions(-)

diff --git a/streampipes-wrapper-standalone/pom.xml b/streampipes-wrapper-standalone/pom.xml
index df12f29d9..a873cb7eb 100644
--- a/streampipes-wrapper-standalone/pom.xml
+++ b/streampipes-wrapper-standalone/pom.xml
@@ -16,23 +16,32 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.streampipes</groupId>
-		<artifactId>streampipes-parent</artifactId>
-		<version>0.91.0-SNAPSHOT</version>
-	</parent>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampipes</groupId>
+        <artifactId>streampipes-parent</artifactId>
+        <version>0.91.0-SNAPSHOT</version>
+    </parent>
 
-	<artifactId>streampipes-wrapper-standalone</artifactId>
-	<name>StreamPipes Wrapper for Standalone Pipeline Element Implementations</name>
+    <artifactId>streampipes-wrapper-standalone</artifactId>
+    <name>StreamPipes Wrapper for Standalone Pipeline Element Implementations</name>
 
-	<dependencies>
-		<!-- StreamPipes dependencies -->
-		<dependency>
-			<groupId>org.apache.streampipes</groupId>
-			<artifactId>streampipes-wrapper</artifactId>
-			<version>0.91.0-SNAPSHOT</version>
-		</dependency>
-	</dependencies>
+    <dependencies>
+        <!-- StreamPipes dependencies -->
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-wrapper</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
index 0d1a189b8..01c42ba73 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
@@ -23,20 +23,20 @@ import org.apache.streampipes.wrapper.runtime.PipelineElement;
 
 import java.util.function.Supplier;
 
-public class AbstractConfiguredPipelineElement<I extends InvocableStreamPipesEntity,
-        B extends BindingParams<I>,
-        T extends PipelineElement<B, I>> {
+public class AbstractConfiguredPipelineElement<K extends InvocableStreamPipesEntity,
+    V extends BindingParams<K>,
+    T extends PipelineElement<V, K>> {
 
-  private B bindingParams;
+  private V bindingParams;
   private Supplier<T> engineSupplier;
 
-  public AbstractConfiguredPipelineElement(B bindingParams,
-                                          Supplier<T> engineSupplier) {
+  public AbstractConfiguredPipelineElement(V bindingParams,
+                                           Supplier<T> engineSupplier) {
     this.bindingParams = bindingParams;
     this.engineSupplier = engineSupplier;
   }
 
-  public B getBindingParams() {
+  public V getBindingParams() {
     return bindingParams;
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
index 551956e76..d4e77a9b2 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
@@ -24,10 +24,10 @@ import org.apache.streampipes.wrapper.runtime.EventProcessor;
 
 import java.util.function.Supplier;
 
-public class ConfiguredEventProcessor<B extends EventProcessorBindingParams>
-        extends AbstractConfiguredPipelineElement<DataProcessorInvocation, B, EventProcessor<B>> {
+public class ConfiguredEventProcessor<T extends EventProcessorBindingParams>
+    extends AbstractConfiguredPipelineElement<DataProcessorInvocation, T, EventProcessor<T>> {
 
-  public ConfiguredEventProcessor(B bindingParams, Supplier<EventProcessor<B>> engineSupplier) {
+  public ConfiguredEventProcessor(T bindingParams, Supplier<EventProcessor<T>> engineSupplier) {
     super(bindingParams, engineSupplier);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
index fbc8b05d9..f6a8e2b49 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
@@ -24,9 +24,10 @@ import org.apache.streampipes.wrapper.runtime.EventSink;
 
 import java.util.function.Supplier;
 
-public class ConfiguredEventSink<B extends EventSinkBindingParams> extends AbstractConfiguredPipelineElement<DataSinkInvocation, B, EventSink<B>> {
+public class ConfiguredEventSink<T extends EventSinkBindingParams>
+    extends AbstractConfiguredPipelineElement<DataSinkInvocation, T, EventSink<T>> {
 
-  public ConfiguredEventSink(B bindingParams, Supplier<EventSink<B>> engineSupplier) {
+  public ConfiguredEventSink(T bindingParams, Supplier<EventSink<T>> engineSupplier) {
     super(bindingParams, engineSupplier);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
index 147b11fcb..67fe96048 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
@@ -23,11 +23,11 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
 
 import java.util.function.Supplier;
 
-public class ConfiguredExternalEventProcessor<B extends EventProcessorBindingParams>
-        extends AbstractConfiguredPipelineElement<DataProcessorInvocation, B,
-        ExternalEventProcessor<B>> {
+public class ConfiguredExternalEventProcessor<T extends EventProcessorBindingParams>
+    extends AbstractConfiguredPipelineElement<DataProcessorInvocation, T,
+    ExternalEventProcessor<T>> {
 
-  public ConfiguredExternalEventProcessor(B bindingParams, Supplier<ExternalEventProcessor<B>> engineSupplier) {
+  public ConfiguredExternalEventProcessor(T bindingParams, Supplier<ExternalEventProcessor<T>> engineSupplier) {
     super(bindingParams, engineSupplier);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
index 38418fba3..9abf7b413 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
@@ -23,10 +23,10 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventSink;
 
 import java.util.function.Supplier;
 
-public class ConfiguredExternalEventSink<B extends EventSinkBindingParams>
-        extends AbstractConfiguredPipelineElement<DataSinkInvocation, B, ExternalEventSink<B>> {
+public class ConfiguredExternalEventSink<T extends EventSinkBindingParams>
+    extends AbstractConfiguredPipelineElement<DataSinkInvocation, T, ExternalEventSink<T>> {
 
-  public ConfiguredExternalEventSink(B bindingParams, Supplier<ExternalEventSink<B>> engineSupplier) {
+  public ConfiguredExternalEventSink(T bindingParams, Supplier<ExternalEventSink<T>> engineSupplier) {
     super(bindingParams, engineSupplier);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
index f5dde0257..f938e413d 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
@@ -24,10 +24,12 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
 
 import java.util.function.Supplier;
 
-public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams> implements EventProcessor<ProcessorParams> {
+public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams>
+    implements EventProcessor<ProcessorParams> {
 
   @Override
-  public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+  public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
+                                                                ProcessingElementParameterExtractor extractor) {
     Supplier<EventProcessor<ProcessorParams>> supplier = () -> this;
     return new ConfiguredEventProcessor<>(new ProcessorParams(graph), supplier);
   }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
index b087b0b2b..5fa3dc4af 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
@@ -24,7 +24,8 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDec
 
 import java.util.function.Supplier;
 
-public abstract class StreamPipesDataSink extends StandaloneEventSinkDeclarer<SinkParams> implements EventSink<SinkParams> {
+public abstract class StreamPipesDataSink extends StandaloneEventSinkDeclarer<SinkParams>
+    implements EventSink<SinkParams> {
 
   @Override
   public ConfiguredEventSink<SinkParams> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
index 1992905c3..3b29d8a62 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
@@ -17,9 +17,6 @@
  */
 package org.apache.streampipes.wrapper.standalone;
 
-import com.google.gson.JsonObject;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -27,119 +24,124 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
 import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
 
+import com.google.gson.JsonObject;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+
 import java.io.IOException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.function.Supplier;
 
-public abstract class StreamPipesExternalDataProcessor extends StandaloneExternalEventProcessingDeclarer<ProcessorParams>
-        implements ExternalEventProcessor<ProcessorParams> {
-
-    // endpoint of Python processor runs here
-    private static final String HTTP_PROTOCOL = "http://";
-    private static final String PYTHON_ENDPOINT = "localhost:5000";
-
-    private String invocationId;
-    private String appId;
-    private String inputTopic;
-    private String outputTopic;
-    private String kafkaUrl;
-
-    @Override
-    public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
-                                                                          ProcessingElementParameterExtractor extractor) {
-        EventProcessorBindingParams params = new ProcessorParams(graph);
-        invocationId = UUID.randomUUID().toString();
-        appId = graph.getAppId();
-        inputTopic = getInputTopic(params);
-        outputTopic = getOutputTopic(params);
-        kafkaUrl = getKafkaUrl(params);
-
-        Supplier<ExternalEventProcessor<ProcessorParams>> supplier = () -> this;
-        return new ConfiguredExternalEventProcessor<>(new ProcessorParams(graph), supplier);
-    }
-
-    protected JsonObject createMinimalInvocationGraph(Map<String, String> staticPropertyMap) {
-        JsonObject json = new JsonObject();
-
-        json.addProperty("invocation_id", invocationId);
-        json.addProperty("processor_id", appId);
-        json.addProperty("input_topics", inputTopic);
-        json.addProperty("output_topics", outputTopic);
-        json.addProperty("bootstrap_servers", kafkaUrl);
-
-        JsonObject staticProperties = new JsonObject();
-        staticPropertyMap.forEach(staticProperties::addProperty);
-        json.add("static_properties", staticProperties);
-
-        return json;
-    }
-
-    protected void invoke(JsonObject json) {
-        post("invoke", json.toString());
-    }
-
-    protected void detach () {
-        JsonObject json = new JsonObject();
-        json.addProperty("invocation_id", invocationId);
-        post("detach", json.toString());
-    }
-
-    private static String post(String endpoint, String payload) {
-        String responseString = null;
-
-        try {
-            responseString = Request.Post(HTTP_PROTOCOL + PYTHON_ENDPOINT + "/" + endpoint)
-                    .bodyString(payload, ContentType.APPLICATION_JSON)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
-                    .execute().returnContent().asString();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return responseString;
-    }
-
-    private String getInputTopic(EventProcessorBindingParams parameters) {
-        return parameters
-                .getGraph()
-                .getInputStreams()
-                .get(0)
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName();
-    }
-
-    private String getOutputTopic(EventProcessorBindingParams parameters) {
-        return parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName();
-    }
-
-    private String getKafkaUrl(EventProcessorBindingParams parameters) {
-        String brokerHostname = parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocols()
-                .get(0)
-                .getBrokerHostname();
-
-        Integer kafkaPort = ((KafkaTransportProtocol) parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocols()
-                .get(0))
-                .getKafkaPort();
-
-        return brokerHostname + ":" + kafkaPort.toString();
+public abstract class StreamPipesExternalDataProcessor
+    extends StandaloneExternalEventProcessingDeclarer<ProcessorParams>
+    implements ExternalEventProcessor<ProcessorParams> {
+
+  // endpoint of Python processor runs here
+  private static final String HTTP_PROTOCOL = "http://";
+  private static final String PYTHON_ENDPOINT = "localhost:5000";
+
+  private String invocationId;
+  private String appId;
+  private String inputTopic;
+  private String outputTopic;
+  private String kafkaUrl;
+
+  private static String post(String endpoint, String payload) {
+    String responseString = null;
+
+    try {
+      responseString = Request.Post(HTTP_PROTOCOL + PYTHON_ENDPOINT + "/" + endpoint)
+          .bodyString(payload, ContentType.APPLICATION_JSON)
+          .connectTimeout(1000)
+          .socketTimeout(100000)
+          .execute().returnContent().asString();
+    } catch (IOException e) {
+      e.printStackTrace();
     }
+    return responseString;
+  }
+
+  @Override
+  public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
+                                                                        ProcessingElementParameterExtractor extractor) {
+    EventProcessorBindingParams params = new ProcessorParams(graph);
+    invocationId = UUID.randomUUID().toString();
+    appId = graph.getAppId();
+    inputTopic = getInputTopic(params);
+    outputTopic = getOutputTopic(params);
+    kafkaUrl = getKafkaUrl(params);
+
+    Supplier<ExternalEventProcessor<ProcessorParams>> supplier = () -> this;
+    return new ConfiguredExternalEventProcessor<>(new ProcessorParams(graph), supplier);
+  }
+
+  protected JsonObject createMinimalInvocationGraph(Map<String, String> staticPropertyMap) {
+    JsonObject json = new JsonObject();
+
+    json.addProperty("invocation_id", invocationId);
+    json.addProperty("processor_id", appId);
+    json.addProperty("input_topics", inputTopic);
+    json.addProperty("output_topics", outputTopic);
+    json.addProperty("bootstrap_servers", kafkaUrl);
+
+    JsonObject staticProperties = new JsonObject();
+    staticPropertyMap.forEach(staticProperties::addProperty);
+    json.add("static_properties", staticProperties);
+
+    return json;
+  }
+
+  protected void invoke(JsonObject json) {
+    post("invoke", json.toString());
+  }
+
+  protected void detach() {
+    JsonObject json = new JsonObject();
+    json.addProperty("invocation_id", invocationId);
+    post("detach", json.toString());
+  }
+
+  private String getInputTopic(EventProcessorBindingParams parameters) {
+    return parameters
+        .getGraph()
+        .getInputStreams()
+        .get(0)
+        .getEventGrounding()
+        .getTransportProtocol()
+        .getTopicDefinition()
+        .getActualTopicName();
+  }
+
+  private String getOutputTopic(EventProcessorBindingParams parameters) {
+    return parameters
+        .getGraph()
+        .getOutputStream()
+        .getEventGrounding()
+        .getTransportProtocol()
+        .getTopicDefinition()
+        .getActualTopicName();
+  }
+
+  private String getKafkaUrl(EventProcessorBindingParams parameters) {
+    String brokerHostname = parameters
+        .getGraph()
+        .getOutputStream()
+        .getEventGrounding()
+        .getTransportProtocols()
+        .get(0)
+        .getBrokerHostname();
+
+    Integer kafkaPort = ((KafkaTransportProtocol) parameters
+        .getGraph()
+        .getOutputStream()
+        .getEventGrounding()
+        .getTransportProtocols()
+        .get(0))
+        .getKafkaPort();
+
+    return brokerHostname + ":" + kafkaPort.toString();
+  }
 
 }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
index 7bbc6d84c..d52cde04c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
@@ -29,22 +29,27 @@ import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
 
 @Deprecated(since = "0.70.0", forRemoval = true)
-public abstract class StandaloneEventProcessingDeclarer<B extends
-        EventProcessorBindingParams> extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
+public abstract class StandaloneEventProcessingDeclarer<T extends
+    EventProcessorBindingParams> extends EventProcessorDeclarer<T, StandaloneEventProcessorRuntime<T>> {
 
-  public abstract ConfiguredEventProcessor<B> onInvocation(DataProcessorInvocation graph,
+  public abstract ConfiguredEventProcessor<T> onInvocation(DataProcessorInvocation graph,
                                                            ProcessingElementParameterExtractor extractor);
 
   @Override
-  public StandaloneEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+  public StandaloneEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
                                                        ProcessingElementParameterExtractor extractor,
                                                        ConfigExtractor configExtractor,
                                                        StreamPipesClient streamPipesClient) {
-    ConfiguredEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
-    EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
-            (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+    ConfiguredEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+    EventProcessorRuntimeParams<T> runtimeParams =
+        new EventProcessorRuntimeParams<> (
+            configuredEngine.getBindingParams(),
+            false,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
-            runtimeParams);
+        runtimeParams);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
index 2fcf23f40..74010a8a9 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
@@ -28,22 +28,33 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
 
-public abstract class StandaloneEventProcessorDeclarerSingleton<B extends EventProcessorBindingParams>
-        extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
+@Deprecated(since = "0.90.0", forRemoval = true)
+/**
+ * @deprecated: since there is no usage
+ */
+public abstract class StandaloneEventProcessorDeclarerSingleton<T extends EventProcessorBindingParams>
+    extends EventProcessorDeclarer<T, StandaloneEventProcessorRuntime<T>> {
 
   @Override
-  public StandaloneEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+  public StandaloneEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
                                                        ProcessingElementParameterExtractor extractor,
                                                        ConfigExtractor configExtractor,
                                                        StreamPipesClient streamPipesClient) {
 
-    ConfiguredEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
-    EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
-            (configuredEngine.getBindingParams(), true, configExtractor, streamPipesClient);
+    ConfiguredEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+    EventProcessorRuntimeParams<T> runtimeParams =
+        new EventProcessorRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            true,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
-            runtimeParams);
+        runtimeParams);
   }
 
-  public abstract ConfiguredEventProcessor<B> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor);
+  public abstract ConfiguredEventProcessor<T> onInvocation(DataProcessorInvocation graph,
+                                                           ProcessingElementParameterExtractor extractor);
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
index 048b857cc..115b21199 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
@@ -28,22 +28,28 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventSinkRuntime;
 
-public abstract class StandaloneEventSinkDeclarer<B extends
-        EventSinkBindingParams> extends EventSinkDeclarer<B, StandaloneEventSinkRuntime<B>> {
+public abstract class StandaloneEventSinkDeclarer<T extends
+    EventSinkBindingParams> extends EventSinkDeclarer<T, StandaloneEventSinkRuntime<T>> {
 
   @Override
-  public StandaloneEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+  public StandaloneEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
                                                   DataSinkParameterExtractor extractor,
                                                   ConfigExtractor configExtractor,
                                                   StreamPipesClient streamPipesClient) {
 
-    ConfiguredEventSink<B> configuredEngine = onInvocation(graph, extractor);
-    EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
-            (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+    ConfiguredEventSink<T> configuredEngine = onInvocation(graph, extractor);
+    EventSinkRuntimeParams<T> runtimeParams =
+        new EventSinkRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            false,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneEventSinkRuntime<>(configuredEngine.getEngineSupplier(), runtimeParams);
   }
 
-  public abstract ConfiguredEventSink<B> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
+  public abstract ConfiguredEventSink<T> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
 
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
index aa2a27b02..88d948b0f 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
@@ -28,21 +28,27 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventSinkRuntime;
 
-public abstract class StandaloneEventSinkDeclarerSingleton<B extends
-        EventSinkBindingParams> extends EventSinkDeclarer<B, StandaloneEventSinkRuntime<B>> {
+public abstract class StandaloneEventSinkDeclarerSingleton<T extends
+    EventSinkBindingParams> extends EventSinkDeclarer<T, StandaloneEventSinkRuntime<T>> {
 
   @Override
-  public StandaloneEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+  public StandaloneEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
                                                   DataSinkParameterExtractor extractor,
                                                   ConfigExtractor configExtractor,
                                                   StreamPipesClient streamPipesClient) {
 
-    ConfiguredEventSink<B> configuredEngine = onInvocation(graph, extractor);
-    EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
-            (configuredEngine.getBindingParams(), true, configExtractor, streamPipesClient);
+    ConfiguredEventSink<T> configuredEngine = onInvocation(graph, extractor);
+    EventSinkRuntimeParams<T> runtimeParams =
+        new EventSinkRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            true,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneEventSinkRuntime<>(configuredEngine.getEngineSupplier(), runtimeParams);
   }
 
-  public abstract ConfiguredEventSink<B> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
+  public abstract ConfiguredEventSink<T> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
index 8e020cf4e..efa85bf6e 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
@@ -27,23 +27,29 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
 import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneExternalEventProcessorRuntime;
 
-public abstract class StandaloneExternalEventProcessingDeclarer<B extends
-        EventProcessorBindingParams> extends EventProcessorDeclarer<B,
-        StandaloneExternalEventProcessorRuntime<B>> {
+public abstract class StandaloneExternalEventProcessingDeclarer<T extends
+    EventProcessorBindingParams> extends EventProcessorDeclarer<T,
+    StandaloneExternalEventProcessorRuntime<T>> {
 
-  public abstract ConfiguredExternalEventProcessor<B> onInvocation(DataProcessorInvocation graph,
-                                                            ProcessingElementParameterExtractor extractor);
+  public abstract ConfiguredExternalEventProcessor<T> onInvocation(DataProcessorInvocation graph,
+                                                                   ProcessingElementParameterExtractor extractor);
 
   @Override
-  public StandaloneExternalEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+  public StandaloneExternalEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
                                                                ProcessingElementParameterExtractor extractor,
                                                                ConfigExtractor configExtractor,
                                                                StreamPipesClient streamPipesClient) {
-    ConfiguredExternalEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
-    EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
-            (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+    ConfiguredExternalEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+    EventProcessorRuntimeParams<T> runtimeParams =
+        new EventProcessorRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            false,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneExternalEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
-            runtimeParams);
+        runtimeParams);
   }
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
index 2815c08ec..3464955c8 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
@@ -27,24 +27,35 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventSink;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneExternalEventSinkRuntime;
 
-public abstract class StandaloneExternalEventSinkDeclarer<B extends
-        EventSinkBindingParams> extends EventSinkDeclarer<B,
-                StandaloneExternalEventSinkRuntime<B>> {
+/**
+ * @deprecated since there is no usage
+ * @param <T>
+ */
+@Deprecated(since = "0.90.0", forRemoval = true)
+public abstract class StandaloneExternalEventSinkDeclarer<T extends
+    EventSinkBindingParams> extends EventSinkDeclarer<T,
+    StandaloneExternalEventSinkRuntime<T>> {
 
   @Override
-  public StandaloneExternalEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+  public StandaloneExternalEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
                                                           DataSinkParameterExtractor extractor,
                                                           ConfigExtractor configExtractor,
                                                           StreamPipesClient streamPipesClient) {
 
-    ConfiguredExternalEventSink<B> configuredEngine = onInvocation(graph, extractor);
-    EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
-            (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+    ConfiguredExternalEventSink<T> configuredEngine = onInvocation(graph, extractor);
+    EventSinkRuntimeParams<T> runtimeParams =
+        new EventSinkRuntimeParams<>
+        (
+            configuredEngine.getBindingParams(),
+            false,
+            configExtractor,
+            streamPipesClient
+        );
 
     return new StandaloneExternalEventSinkRuntime<>(configuredEngine.getEngineSupplier(),
-            runtimeParams);
+        runtimeParams);
   }
 
-  public abstract ConfiguredExternalEventSink<B> onInvocation(DataSinkInvocation graph,
+  public abstract ConfiguredExternalEventSink<T> onInvocation(DataSinkInvocation graph,
                                                               DataSinkParameterExtractor extractor);
 }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index d9e9aaa50..9373aed89 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -35,6 +35,7 @@ import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.wrapper.routing.RawDataProcessor;
 import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,9 +48,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclarer, RawDataProcessor {
 
   private static final Logger LOG = LoggerFactory.getLogger(StreamPipesFunction.class);
-  private Map<String, SpInputCollector> inputCollectors;
   private final Map<String, SourceInfo> sourceInfoMapper;
   private final Map<String, SchemaInfo> schemaInfoMapper;
+  private Map<String, SpInputCollector> inputCollectors;
 
   public StreamPipesFunction() {
     this.sourceInfoMapper = new HashMap<>();
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
index 299be5780..19c9b1b79 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
@@ -18,31 +18,32 @@
 
 package org.apache.streampipes.wrapper.standalone.manager;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.grounding.TransportFormat;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.wrapper.standalone.routing.StandaloneSpInputCollector;
 import org.apache.streampipes.wrapper.standalone.routing.StandaloneSpOutputCollector;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.HashMap;
 import java.util.Map;
 
 public class ProtocolManager {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ProtocolManager.class);
   public static Map<String, StandaloneSpInputCollector> consumers = new HashMap<>();
   public static Map<String, StandaloneSpOutputCollector> producers = new HashMap<>();
 
-  private static final Logger LOG = LoggerFactory.getLogger(ProtocolManager.class);
-
   // TODO currently only the topic name is used as an identifier for a consumer/producer. Should
   // be changed by some hashCode implementation in streampipes-model, but this requires changes
   // in empire serializers
 
   public static <T extends TransportProtocol> StandaloneSpInputCollector findInputCollector(T protocol,
                                                                                             TransportFormat format,
-                                                                                            Boolean singletonEngine) throws SpRuntimeException {
+                                                                                            Boolean singletonEngine)
+      throws SpRuntimeException {
 
     if (consumers.containsKey(topicName(protocol))) {
       return consumers.get(topicName(protocol));
@@ -56,14 +57,15 @@ public class ProtocolManager {
 
   public static <T extends TransportProtocol> StandaloneSpOutputCollector findOutputCollector(T protocol,
                                                                                               TransportFormat format,
-                                                                                              String resourceId) throws SpRuntimeException {
+                                                                                              String resourceId)
+      throws SpRuntimeException {
 
     if (producers.containsKey(topicName(protocol))) {
       return producers.get(topicName(protocol));
     } else {
       producers.put(topicName(protocol), makeOutputCollector(protocol, format, resourceId));
       LOG.info("Adding new producer to producer map (size=" + producers.size() + "): " + topicName
-              (protocol));
+          (protocol));
       return producers.get(topicName(protocol));
     }
 
@@ -71,13 +73,15 @@ public class ProtocolManager {
 
   private static <T extends TransportProtocol> StandaloneSpInputCollector<T> makeInputCollector(T protocol,
                                                                                                 TransportFormat format,
-                                                                                                Boolean singletonEngine) throws SpRuntimeException {
+                                                                                                Boolean singletonEngine)
+      throws SpRuntimeException {
     return new StandaloneSpInputCollector<>(protocol, format, singletonEngine);
   }
 
   public static <T extends TransportProtocol> StandaloneSpOutputCollector<T> makeOutputCollector(T protocol,
                                                                                                  TransportFormat format,
-                                                                                                 String resourceId) throws SpRuntimeException {
+                                                                                                 String resourceId)
+      throws SpRuntimeException {
     return new StandaloneSpOutputCollector<>(protocol, format, resourceId);
   }
 
@@ -86,17 +90,17 @@ public class ProtocolManager {
   }
 
   public static <T extends TransportProtocol> void removeInputCollector(T protocol) throws
-          SpRuntimeException {
+      SpRuntimeException {
     consumers.remove(topicName(protocol));
     LOG.info("Removing consumer from consumer map (size=" + consumers.size() + "): " + topicName
-            (protocol));
+        (protocol));
   }
 
   public static <T extends TransportProtocol> void removeOutputCollector(T protocol) throws
-          SpRuntimeException {
+      SpRuntimeException {
     producers.remove(topicName(protocol));
     LOG.info("Removing producer from producer map (size=" + producers.size() + "): " + topicName
-            (protocol));
+        (protocol));
   }
 
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
index cc7c73818..35dc44304 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
@@ -29,10 +29,10 @@ import org.apache.streampipes.wrapper.standalone.manager.PManager;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public abstract class StandaloneSpCollector<T extends TransportProtocol, C> implements
-        PipelineElementCollector<C> {
+public abstract class StandaloneSpCollector<T extends TransportProtocol, W> implements
+    PipelineElementCollector<W> {
 
-  protected Map<String, C> consumers;
+  protected Map<String, W> consumers;
 
   protected T transportProtocol;
   protected SpProtocolDefinition<T> protocolDefinition;
@@ -45,15 +45,15 @@ public abstract class StandaloneSpCollector<T extends TransportProtocol, C> impl
   public StandaloneSpCollector(T protocol, TransportFormat format) throws SpRuntimeException {
     this.transportProtocol = protocol;
     this.protocolDefinition = PManager.getProtocolDefinition(protocol).orElseThrow(() -> new
-            SpRuntimeException("Could not find protocol"));
+        SpRuntimeException("Could not find protocol"));
     this.transportFormat = format;
     this.dataFormatDefinition = PManager.getDataFormat(format).orElseThrow(() -> new
-            SpRuntimeException("Could not find format"));
+        SpRuntimeException("Could not find format"));
     this.consumers = new ConcurrentHashMap<>();
     this.topic = transportProtocol.getTopicDefinition().getActualTopicName();
   }
 
-  public void registerConsumer(String routeId, C consumer) {
+  public void registerConsumer(String routeId, W consumer) {
     consumers.put(routeId, consumer);
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
index 77277c6c1..eca3b78bd 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
@@ -27,9 +27,9 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
 
 public class StandaloneSpInputCollector<T extends TransportProtocol> extends
-        StandaloneSpCollector<T, RawDataProcessor>
-        implements
-        InternalEventProcessor<byte[]>, SpInputCollector {
+    StandaloneSpCollector<T, RawDataProcessor>
+    implements
+    InternalEventProcessor<byte[]>, SpInputCollector {
 
   private final Boolean singletonEngine;
 
@@ -42,20 +42,20 @@ public class StandaloneSpInputCollector<T extends TransportProtocol> extends
   @Override
   public void onEvent(byte[] event) {
     if (singletonEngine) {
-     send(consumers.get(consumers.keySet().toArray()[0]), event);
+      send(consumers.get(consumers.keySet().toArray()[0]), event);
     } else {
       consumers.forEach((key, value) -> send(value, event));
     }
   }
 
   private void send(RawDataProcessor rawDataProcessor, byte[] event) {
-      rawDataProcessor.process(dataFormatDefinition.toMap(event), topic);
+    rawDataProcessor.process(dataFormatDefinition.toMap(event), topic);
   }
 
   @Override
   public void connect() throws SpRuntimeException {
     if (!protocolDefinition.getConsumer().isConnected()) {
-      protocolDefinition.getConsumer().connect(transportProtocol,this);
+      protocolDefinition.getConsumer().connect(transportProtocol, this);
     }
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
index d658e58a3..c6a55a3cc 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
@@ -30,14 +30,15 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.EventConverter;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 public class StandaloneSpOutputCollector<T extends TransportProtocol> extends
-        StandaloneSpCollector<T, InternalEventProcessor<Map<String,
-                Object>>> implements SpOutputCollector {
+    StandaloneSpCollector<T, InternalEventProcessor<Map<String,
+        Object>>> implements SpOutputCollector {
 
   private static final Logger LOG = LoggerFactory.getLogger(StandaloneSpOutputCollector.class);
 
@@ -47,9 +48,9 @@ public class StandaloneSpOutputCollector<T extends TransportProtocol> extends
   public StandaloneSpOutputCollector(T protocol,
                                      TransportFormat format,
                                      String resourceId) throws SpRuntimeException {
-   super(protocol, format);
-   this.producer = protocolDefinition.getProducer();
-   this.resourceId = resourceId;
+    super(protocol, format);
+    this.producer = protocolDefinition.getProducer();
+    this.resourceId = resourceId;
   }
 
   public void collect(Event event) {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index 60763cdc5..4d0a686d1 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -27,22 +27,23 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.function.Supplier;
 
-public class StandaloneEventProcessorRuntime<B extends EventProcessorBindingParams> extends
-    StandalonePipelineElementRuntime<B, DataProcessorInvocation,
-        EventProcessorRuntimeParams<B>, EventProcessorRuntimeContext, EventProcessor<B>> {
+public class StandaloneEventProcessorRuntime<T extends EventProcessorBindingParams> extends
+    StandalonePipelineElementRuntime<T, DataProcessorInvocation,
+        EventProcessorRuntimeParams<T>, EventProcessorRuntimeContext, EventProcessor<T>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(StandaloneEventProcessorRuntime.class);
 
   protected SpOutputCollector outputCollector;
 
-  public StandaloneEventProcessorRuntime(Supplier<EventProcessor<B>> supplier,
-                                         EventProcessorRuntimeParams<B> params) {
+  public StandaloneEventProcessorRuntime(Supplier<EventProcessor<T>> supplier,
+                                         EventProcessorRuntimeParams<T> params) {
     super(supplier, params);
     this.outputCollector = getOutputCollector();
   }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
index d706a2397..5d5156b2a 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
@@ -25,19 +25,20 @@ import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 import org.apache.streampipes.wrapper.routing.SpInputCollector;
 import org.apache.streampipes.wrapper.runtime.EventSink;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.function.Supplier;
 
-public class StandaloneEventSinkRuntime<B extends EventSinkBindingParams> extends
-    StandalonePipelineElementRuntime<B, DataSinkInvocation,
-        EventSinkRuntimeParams<B>, EventSinkRuntimeContext, EventSink<B>> {
+public class StandaloneEventSinkRuntime<T extends EventSinkBindingParams> extends
+    StandalonePipelineElementRuntime<T, DataSinkInvocation,
+        EventSinkRuntimeParams<T>, EventSinkRuntimeContext, EventSink<T>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(StandaloneEventSinkRuntime.class);
 
-  public StandaloneEventSinkRuntime(Supplier<EventSink<B>> supplier, EventSinkRuntimeParams<B>
+  public StandaloneEventSinkRuntime(Supplier<EventSink<T>> supplier, EventSinkRuntimeParams<T>
       params) {
     super(supplier, params);
   }
@@ -55,7 +56,7 @@ public class StandaloneEventSinkRuntime<B extends EventSinkBindingParams> extend
       monitoringManager.increaseInCounter(resourceId, sourceInfo, System.currentTimeMillis());
       engine.onEvent(params.makeEvent(rawEvent, sourceInfo));
     } catch (RuntimeException e) {
-      LOG.error("RuntimeException while processing event in {}", engine.getClass().getCanonicalName() , e);
+      LOG.error("RuntimeException while processing event in {}", engine.getClass().getCanonicalName(), e);
       addLogEntry(e);
     }
   }
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
index 01c01878d..5e0dd840f 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
@@ -27,12 +27,12 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
 import java.util.Map;
 import java.util.function.Supplier;
 
-public class StandaloneExternalEventProcessorRuntime<B extends EventProcessorBindingParams> extends
-        StandalonePipelineElementRuntime<B, DataProcessorInvocation,
-                EventProcessorRuntimeParams<B>, EventProcessorRuntimeContext, ExternalEventProcessor<B>> {
+public class StandaloneExternalEventProcessorRuntime<T extends EventProcessorBindingParams> extends
+    StandalonePipelineElementRuntime<T, DataProcessorInvocation,
+        EventProcessorRuntimeParams<T>, EventProcessorRuntimeContext, ExternalEventProcessor<T>> {
 
-  public StandaloneExternalEventProcessorRuntime(Supplier<ExternalEventProcessor<B>> supplier,
-                                                 EventProcessorRuntimeParams<B> runtimeParams) {
+  public StandaloneExternalEventProcessorRuntime(Supplier<ExternalEventProcessor<T>> supplier,
+                                                 EventProcessorRuntimeParams<T> runtimeParams) {
     super(supplier, runtimeParams);
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
index 418702da1..de5f4c881 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
@@ -27,11 +27,12 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventSink;
 import java.util.Map;
 import java.util.function.Supplier;
 
-public class StandaloneExternalEventSinkRuntime<B extends EventSinkBindingParams> extends
-        StandalonePipelineElementRuntime<B, DataSinkInvocation,
-                EventSinkRuntimeParams<B>, EventSinkRuntimeContext, ExternalEventSink<B>> {
+public class StandaloneExternalEventSinkRuntime<T extends EventSinkBindingParams> extends
+    StandalonePipelineElementRuntime<T, DataSinkInvocation,
+        EventSinkRuntimeParams<T>, EventSinkRuntimeContext, ExternalEventSink<T>> {
 
-  public StandaloneExternalEventSinkRuntime(Supplier<ExternalEventSink<B>> supplier, EventSinkRuntimeParams<B> runtimeParams) {
+  public StandaloneExternalEventSinkRuntime(Supplier<ExternalEventSink<T>> supplier,
+                                            EventSinkRuntimeParams<T> runtimeParams) {
     super(supplier, runtimeParams);
   }
 
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
index 4a046f33e..eab751d27 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
@@ -36,19 +36,19 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Supplier;
 
-public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I>,
-        I extends InvocableStreamPipesEntity,
-        RP extends RuntimeParams<B, I, RC>,
-        RC extends RuntimeContext,
-        P extends PipelineElement<B, I>>
-        extends PipelineElementRuntime implements RawDataProcessor {
+public abstract class StandalonePipelineElementRuntime<T extends BindingParams<K>,
+    K extends InvocableStreamPipesEntity,
+    V extends RuntimeParams<T, K, X>,
+    X extends RuntimeContext,
+    PeT extends PipelineElement<T, K>>
+    extends PipelineElementRuntime implements RawDataProcessor {
 
-  protected RP params;
-  protected final P engine;
+  protected final PeT engine;
+  protected V params;
   protected SpMonitoringManager monitoringManager;
   protected String resourceId;
 
-  public StandalonePipelineElementRuntime(Supplier<P> supplier, RP runtimeParams) {
+  public StandalonePipelineElementRuntime(Supplier<PeT> supplier, V runtimeParams) {
     super();
     this.engine = supplier.get();
     this.params = runtimeParams;
@@ -56,7 +56,7 @@ public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I
     this.resourceId = params.getBindingParams().getGraph().getElementId();
   }
 
-  public P getEngine() {
+  public PeT getEngine() {
     return engine;
   }
 
@@ -69,8 +69,8 @@ public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I
     List<SpInputCollector> inputCollectors = new ArrayList<>();
     for (SpDataStream is : params.getBindingParams().getGraph().getInputStreams()) {
       inputCollectors.add(ProtocolManager.findInputCollector(is.getEventGrounding()
-                      .getTransportProtocol(), is.getEventGrounding().getTransportFormats().get(0),
-              params.isSingletonEngine()));
+              .getTransportProtocol(), is.getEventGrounding().getTransportFormats().get(0),
+          params.isSingletonEngine()));
     }
     return inputCollectors;
   }


[streampipes] 07/07: add checkstyle to streampipes-wrapper-python

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 94e59eb108810bf937854e2bf0dd109196d2cc44
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:08:52 2022 +0100

    add checkstyle to streampipes-wrapper-python
---
 streampipes-wrapper-python/README.md               | 22 ++++++++++++++--------
 streampipes-wrapper-python/pom.xml                 | 14 ++++++++++----
 .../streampipes/api/templates/index.html           | 19 ++++++++++---------
 3 files changed, 34 insertions(+), 21 deletions(-)

diff --git a/streampipes-wrapper-python/README.md b/streampipes-wrapper-python/README.md
index 16c471e5a..9a6143790 100644
--- a/streampipes-wrapper-python/README.md
+++ b/streampipes-wrapper-python/README.md
@@ -24,24 +24,30 @@
 [![Twitter](https://img.shields.io/twitter/follow/StreamPipes.svg?label=Follow&style=social)](https://twitter.com/StreamPipes)
 
 ## Apache StreamPipes Wrapper for Python [WIP]
-**NOTE**: 
 
-> The StreamPipes wrapper for python is currently under development. Thus, the processor model description still needs to be implemented externally in Java.
+**NOTE**:
+
+> The StreamPipes wrapper for python is currently under development. Thus, the processor model description still needs
+> to be implemented externally in Java.
 
 ## Apache StreamPipes
-Apache StreamPipes enables flexible modeling of stream processing pipelines by providing a graphical 
-modeling editor on top of existing stream processing frameworks.
 
-It leverages non-technical users to quickly define and execute processing pipelines based on an easily extensible 
-toolbox of data sources, data processors and data sinks. StreamPipes has an exchangeable runtime execution layer and executes pipelines using one of the provided wrappers, e.g., for Apache Flink or Apache Kafka Streams.
+Apache StreamPipes enables flexible modeling of stream processing pipelines by providing a graphical
+modeling editor on top of existing stream processing frameworks.
 
-Pipeline elements in StreamPipes can be installed at runtime - the built-in SDK allows to easily implement new 
-pipeline elements according to your needs. Pipeline elements are standalone microservices that can run anywhere - centrally on your server, in a large-scale cluster or close at the edge.
+It leverages non-technical users to quickly define and execute processing pipelines based on an easily extensible
+toolbox of data sources, data processors and data sinks. StreamPipes has an exchangeable runtime execution layer and
+executes pipelines using one of the provided wrappers, e.g., for Apache Flink or Apache Kafka Streams.
 
+Pipeline elements in StreamPipes can be installed at runtime - the built-in SDK allows to easily implement new
+pipeline elements according to your needs. Pipeline elements are standalone microservices that can run anywhere -
+centrally on your server, in a large-scale cluster or close at the edge.
 
 ## A Speudocode Example
+
 **NOTE**:
 Only works in combination with Java!
+
 ````
 from streampipes.core import StandaloneModelSubmitter
 from streampipes.manager import Declarer
diff --git a/streampipes-wrapper-python/pom.xml b/streampipes-wrapper-python/pom.xml
index ec7f52f27..86542fdfb 100644
--- a/streampipes-wrapper-python/pom.xml
+++ b/streampipes-wrapper-python/pom.xml
@@ -16,15 +16,21 @@
   ~ limitations under the License.
   ~
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>streampipes-parent</artifactId>
         <groupId>org.apache.streampipes</groupId>
         <version>0.91.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-
     <artifactId>streampipes-wrapper-python</artifactId>
-
-
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/streampipes-wrapper-python/streampipes/api/templates/index.html b/streampipes-wrapper-python/streampipes/api/templates/index.html
index 7fd1f7d29..4454426fb 100644
--- a/streampipes-wrapper-python/streampipes/api/templates/index.html
+++ b/streampipes-wrapper-python/streampipes/api/templates/index.html
@@ -20,13 +20,14 @@
 <html lang="en">
 <head>
     <title>StreamPipes Python</title>
-    <meta name="viewport" content="width=device-width, initial-scale=1">
-    <link type="text/css" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css" rel="stylesheet">
+    <meta content="width=device-width, initial-scale=1" name="viewport">
+    <link href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css" rel="stylesheet" type="text/css">
     <style type="text/css">
         body {padding-top: 70px;}
+
     </style>
-    <script type="text/javascript" src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/js/bootstrap.min.js"></script>
-    <script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js"></script>
+    <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/js/bootstrap.min.js" type="text/javascript"></script>
+    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js" type="text/javascript"></script>
 </head>
 <body>
 <nav class="navbar navbar-inverse navbar-fixed-top" style="background: rgb(57, 181, 74)">
@@ -40,11 +41,11 @@
     <h4>This is a developer-oriented view. Navigate to 'Install Pipeline Elements' in the
         StreamPipes UI to import the elements shown here.
     </h4>
-            {% for processor in processors %}
-            <h3>{{ processor.name }}</h3>
-            <h4>URI: <a href="{{ processor.uri }}">{{ processor.uri }}</a></h4>
-            <h4>Description: {{ processor.description }}</h4>
-            {% endfor %}
+    {% for processor in processors %}
+    <h3>{{ processor.name }}</h3>
+    <h4>URI: <a href="{{ processor.uri }}">{{ processor.uri }}</a></h4>
+    <h4>Description: {{ processor.description }}</h4>
+    {% endfor %}
 </div>
 </body>
 </html>
\ No newline at end of file


[streampipes] 06/07: add checkstyle to streampipes-wrapper-kafka-streams

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 17622e6afe4b69ba0a588c0f5e2e689ed873cd76
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:07:48 2022 +0100

    add checkstyle to streampipes-wrapper-kafka-streams
---
 streampipes-wrapper-kafka-streams/pom.xml          | 11 ++++++++++-
 .../kafka/KafkaStreamsDataProcessorDeclarer.java   |  2 +-
 .../kafka/KafkaStreamsDataProcessorRuntime.java    | 23 +++++++++++-----------
 .../kafka/KafkaStreamsDataSinkDeclarer.java        |  4 ++--
 .../wrapper/kafka/KafkaStreamsDataSinkRuntime.java |  2 +-
 .../wrapper/kafka/KafkaStreamsRuntime.java         | 21 ++++++++++----------
 .../wrapper/kafka/converter/JsonToMapFormat.java   |  7 ++++---
 7 files changed, 41 insertions(+), 29 deletions(-)

diff --git a/streampipes-wrapper-kafka-streams/pom.xml b/streampipes-wrapper-kafka-streams/pom.xml
index 1362a7770..bf2d19657 100644
--- a/streampipes-wrapper-kafka-streams/pom.xml
+++ b/streampipes-wrapper-kafka-streams/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>streampipes-parent</artifactId>
         <groupId>org.apache.streampipes</groupId>
@@ -59,4 +60,12 @@
             </exclusions>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
index 633f20039..36b33d975 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
@@ -21,7 +21,7 @@ import org.apache.streampipes.wrapper.declarer.EventProcessorDeclarer;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class KafkaStreamsDataProcessorDeclarer<B extends
-        EventProcessorBindingParams> extends EventProcessorDeclarer<B, KafkaStreamsDataProcessorRuntime> {
+    EventProcessorBindingParams> extends EventProcessorDeclarer<B, KafkaStreamsDataProcessorRuntime> {
 
 
 }
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
index 79cbbdd29..02cd53799 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
@@ -17,10 +17,6 @@
  */
 package org.apache.streampipes.wrapper.kafka;
 
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -32,13 +28,18 @@ import org.apache.streampipes.wrapper.kafka.converter.MapToJsonFormat;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
 
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
 import java.util.Map;
 import java.util.regex.Pattern;
 
 public abstract class KafkaStreamsDataProcessorRuntime<B extends
-        EventProcessorBindingParams>
-        extends KafkaStreamsRuntime<EventProcessorRuntimeParams<B>, B,
-        DataProcessorInvocation, EventProcessorRuntimeContext> {
+    EventProcessorBindingParams>
+    extends KafkaStreamsRuntime<EventProcessorRuntimeParams<B>, B,
+    DataProcessorInvocation, EventProcessorRuntimeContext> {
 
 
   public KafkaStreamsDataProcessorRuntime(EventProcessorRuntimeParams<B> runtimeParams) {
@@ -53,7 +54,7 @@ public abstract class KafkaStreamsDataProcessorRuntime<B extends
       StreamsBuilder builder = new StreamsBuilder();
       SpDataStream inputStream = runtimeParams.getBindingParams().getGraph().getInputStreams().get(0);
       TransportProtocol protocol = protocol(inputStream);
-      KStream<String, String> stream ;
+      KStream<String, String> stream;
 
       if (protocol.getTopicDefinition() instanceof SimpleTopicDefinition) {
         stream = builder.stream(getTopic(inputStream));
@@ -62,12 +63,12 @@ public abstract class KafkaStreamsDataProcessorRuntime<B extends
       }
 
       KStream<String, Map<String, Object>> mapFormat = stream.flatMapValues((ValueMapper<String, Iterable<Map<String,
-              Object>>>) s -> new JsonToMapFormat(getGraph()).apply(s));
+          Object>>>) s -> new JsonToMapFormat(getGraph()).apply(s));
 
       KStream<String, String> outStream = getApplicationLogic(mapFormat).flatMapValues(new
-              MapToJsonFormat());
+          MapToJsonFormat());
       outStream.to(getTopic(runtimeParams.getBindingParams().getGraph()
-              .getOutputStream()));
+          .getOutputStream()));
       streams = new KafkaStreams(builder.build(), config);
 
       streams.start();
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
index 36dfa4e8d..349f6b556 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
@@ -20,6 +20,6 @@ package org.apache.streampipes.wrapper.kafka;
 import org.apache.streampipes.wrapper.declarer.EventSinkDeclarer;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
-public abstract class KafkaStreamsDataSinkDeclarer <B extends
-        EventSinkBindingParams> extends EventSinkDeclarer<B, KafkaStreamsDataSinkRuntime> {
+public abstract class KafkaStreamsDataSinkDeclarer<B extends
+    EventSinkBindingParams> extends EventSinkDeclarer<B, KafkaStreamsDataSinkRuntime> {
 }
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
index 9e9b8dfa2..56270602e 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 
 public abstract class KafkaStreamsDataSinkRuntime<B extends EventSinkBindingParams>
-        extends KafkaStreamsRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
+    extends KafkaStreamsRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
 
 
   public KafkaStreamsDataSinkRuntime(EventSinkRuntimeParams<B> runtimeParams) {
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
index cdb8d665a..cc4bd5c63 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
@@ -17,10 +17,6 @@
  */
 package org.apache.streampipes.wrapper.kafka;
 
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.wrapper.context.RuntimeContext;
@@ -28,12 +24,17 @@ import org.apache.streampipes.wrapper.distributed.runtime.DistributedRuntime;
 import org.apache.streampipes.wrapper.params.binding.BindingParams;
 import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+
 import java.util.Map;
 import java.util.Properties;
 
 public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B extends
-        BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext> extends
-        DistributedRuntime<RP, B, I, RC> {
+    BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext> extends
+    DistributedRuntime<RP, B, I, RC> {
 
   Properties config;
   KafkaStreams streams;
@@ -46,10 +47,10 @@ public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B
   public void prepareRuntime() throws SpRuntimeException {
     config = new Properties();
     config.put(StreamsConfig.APPLICATION_ID_CONFIG, gneerateApplicationId(runtimeParams.getBindingParams()
-            .getGraph()
-            .getElementId()));
+        .getGraph()
+        .getElementId()));
     config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaUrl(runtimeParams.getBindingParams().getGraph()
-            .getInputStreams().get(0)));
+        .getInputStreams().get(0)));
     config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
     config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
   }
@@ -69,6 +70,6 @@ public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B
   }
 
   protected abstract KStream<String, Map<String, Object>> getApplicationLogic(KStream<String, Map<String, Object>>...
-                                                                           inputStreams);
+                                                                                  inputStreams);
 
 }
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
index 7173d7c40..9a077d313 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
@@ -17,18 +17,19 @@
  */
 package org.apache.streampipes.wrapper.kafka.converter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.streampipes.logging.impl.EventStatisticLogger;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 public class JsonToMapFormat implements ValueMapper<String, Iterable<Map<String,
-        Object>>> {
+    Object>>> {
 
   private ObjectMapper mapper;
   private InvocableStreamPipesEntity graph;


[streampipes] 04/07: add checkstyle to streampipes-wrapper-distributed

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 0bcd4f06b1e1c55e50e32099b9140c21cc7eb03b
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:02:55 2022 +0100

    add checkstyle to streampipes-wrapper-distributed
---
 streampipes-wrapper-distributed/pom.xml            | 11 +++++-
 .../distributed/runtime/DistributedRuntime.java    | 42 ++++++++++++----------
 2 files changed, 33 insertions(+), 20 deletions(-)

diff --git a/streampipes-wrapper-distributed/pom.xml b/streampipes-wrapper-distributed/pom.xml
index e20f93b6d..2b98c9358 100644
--- a/streampipes-wrapper-distributed/pom.xml
+++ b/streampipes-wrapper-distributed/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>streampipes-parent</artifactId>
         <groupId>org.apache.streampipes</groupId>
@@ -50,4 +51,12 @@
             <version>0.91.0-SNAPSHOT</version>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
index 4afc4fa83..d4dbfe25a 100644
--- a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
+++ b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
@@ -25,7 +25,11 @@ import org.apache.streampipes.messaging.kafka.config.ConsumerConfigFactory;
 import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.grounding.*;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.wrapper.context.RuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.BindingParams;
 import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
@@ -33,24 +37,24 @@ import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime;
 
 import java.util.Properties;
 
-public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B extends
-        BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext> extends
-        PipelineElementRuntime {
+public abstract class DistributedRuntime<RpT extends RuntimeParams<V, W, X>, V extends
+    BindingParams<W>, W extends InvocableStreamPipesEntity, X extends RuntimeContext> extends
+    PipelineElementRuntime {
 
-  protected RP runtimeParams;
-  protected B bindingParams;
+  protected RpT runtimeParams;
+  protected V bindingParams;
 
   @Deprecated
-  protected B params;
+  protected V params;
 
-  public DistributedRuntime(RP runtimeParams) {
+  public DistributedRuntime(RpT runtimeParams) {
     super();
     this.runtimeParams = runtimeParams;
     this.bindingParams = runtimeParams.getBindingParams();
     this.params = runtimeParams.getBindingParams();
   }
 
-  public DistributedRuntime(B bindingParams,
+  public DistributedRuntime(V bindingParams,
                             ConfigExtractor configExtractor,
                             StreamPipesClient streamPipesClient) {
     super();
@@ -59,7 +63,7 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
     this.runtimeParams = makeRuntimeParams(configExtractor, streamPipesClient);
   }
 
-  protected I getGraph() {
+  protected W getGraph() {
     return runtimeParams.getBindingParams().getGraph();
   }
 
@@ -77,8 +81,8 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
 
   protected String getTopic(SpDataStream stream) {
     return protocol(stream)
-            .getTopicDefinition()
-            .getActualTopicName();
+        .getTopicDefinition()
+        .getActualTopicName();
   }
 
   protected JmsTransportProtocol getJmsProtocol(SpDataStream stream) {
@@ -103,15 +107,15 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
 
   protected TransportProtocol protocol(SpDataStream stream) {
     return stream
-            .getEventGrounding()
-            .getTransportProtocol();
+        .getEventGrounding()
+        .getTransportProtocol();
   }
 
   protected String getKafkaUrl(SpDataStream stream) {
     // TODO add also jms support
-    return protocol(stream).getBrokerHostname() +
-            ":" +
-            ((KafkaTransportProtocol) protocol(stream)).getKafkaPort();
+    return protocol(stream).getBrokerHostname()
+        + ":"
+        + ((KafkaTransportProtocol) protocol(stream)).getKafkaPort();
   }
 
   protected String replaceWildcardWithPatternFormat(String topic) {
@@ -119,7 +123,7 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
     return topic.replaceAll("\\*", ".*");
   }
 
-  protected abstract RP makeRuntimeParams(ConfigExtractor configExtractor,
-                                          StreamPipesClient streamPipesClient);
+  protected abstract RpT makeRuntimeParams(ConfigExtractor configExtractor,
+                                           StreamPipesClient streamPipesClient);
 
 }


[streampipes] 01/07: remove suppression for vocabulary

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 94c3f2dff1d57a906b18a8d0a62a71434b9b47ab
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 20:40:48 2022 +0100

    remove suppression for vocabulary
---
 tools/maven/suppressions.xml | 1 -
 1 file changed, 1 deletion(-)

diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 09c51005e..e52a442bc 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -21,7 +21,6 @@
         "-//Puppy Crawl//DTD Suppressions 1.1//EN"
         "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
 <suppressions>
-    <suppress checks="." files="[\\/]org.apache.streampipes.vocabulary[\\/].*\.java$"/>
     <suppress checks="." files="org.apache.streampipes.processors.geo.jvm.processor.revgeocoder.geocode"/>
     <suppress checks="." files="com.github.jqudt"/>
     <suppress checks="." files="com.kohlschutter.boilerpipe"/>


[streampipes] 05/07: add checkstyle to streampipes-wrapper-flink

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 1e73b24fd1efd659822289bd7dfc0bb07d627c1c
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:06:56 2022 +0100

    add checkstyle to streampipes-wrapper-flink
---
 streampipes-wrapper-flink/pom.xml                  | 11 ++-
 .../wrapper/flink/FlinkDataProcessorDeclarer.java  |  4 +-
 .../wrapper/flink/FlinkDataProcessorRuntime.java   | 32 ++++-----
 .../wrapper/flink/FlinkDataSinkDeclarer.java       |  4 +-
 .../wrapper/flink/FlinkDataSinkRuntime.java        | 15 +++--
 .../flink/FlinkMiniClusterDeploymentConfig.java    |  6 +-
 .../streampipes/wrapper/flink/FlinkRuntime.java    | 78 ++++++++++++----------
 .../wrapper/flink/FlinkSpMiniCluster.java          |  8 +--
 .../wrapper/flink/consumer/JmsFlinkConsumer.java   |  7 +-
 .../wrapper/flink/consumer/MqttFlinkConsumer.java  |  5 +-
 .../flink/converter/EventToMapConverter.java       |  5 +-
 .../flink/converter/MapToEventConverter.java       | 13 ++--
 .../wrapper/flink/logger/StatisticLogger.java      | 23 ++++---
 .../flink/serializer/ByteArrayDeserializer.java    |  3 +-
 .../flink/serializer/ByteArraySerializer.java      |  3 +-
 .../flink/serializer/SimpleJmsSerializer.java      | 34 +++++-----
 .../wrapper/flink/sink/JmsFlinkProducer.java       |  7 +-
 .../wrapper/flink/sink/MqttFlinkProducer.java      |  7 +-
 .../flink/status/PipelineElementStatusSender.java  |  3 +-
 .../status/PipelineElementStatusSenderFactory.java | 10 +--
 20 files changed, 154 insertions(+), 124 deletions(-)

diff --git a/streampipes-wrapper-flink/pom.xml b/streampipes-wrapper-flink/pom.xml
index 6208b71b8..170f966cb 100644
--- a/streampipes-wrapper-flink/pom.xml
+++ b/streampipes-wrapper-flink/pom.xml
@@ -16,7 +16,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.streampipes</groupId>
@@ -111,4 +112,12 @@
             <artifactId>snappy-java</artifactId>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
index 72405b983..d6bd199d2 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
@@ -21,8 +21,8 @@ package org.apache.streampipes.wrapper.flink;
 import org.apache.streampipes.wrapper.declarer.EventProcessorDeclarer;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
-public abstract class FlinkDataProcessorDeclarer<B extends EventProcessorBindingParams>
-        extends EventProcessorDeclarer<B, FlinkDataProcessorRuntime<B>> {
+public abstract class FlinkDataProcessorDeclarer<T extends EventProcessorBindingParams>
+    extends EventProcessorDeclarer<T, FlinkDataProcessorRuntime<T>> {
 
 
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
index 23baa1cdf..f1269507f 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
@@ -18,8 +18,6 @@
 
 package org.apache.streampipes.wrapper.flink;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
@@ -35,6 +33,9 @@ import org.apache.streampipes.wrapper.flink.sink.JmsFlinkProducer;
 import org.apache.streampipes.wrapper.flink.sink.MqttFlinkProducer;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,15 +43,15 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
-public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingParams> extends
-        FlinkRuntime<EventProcessorRuntimeParams<B>, B,
+public abstract class FlinkDataProcessorRuntime<T extends EventProcessorBindingParams> extends
+    FlinkRuntime<EventProcessorRuntimeParams<T>, T,
         DataProcessorInvocation, EventProcessorRuntimeContext> {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(FlinkDataProcessorRuntime.class);
 
 
-  public FlinkDataProcessorRuntime(B params,
+  public FlinkDataProcessorRuntime(T params,
                                    ConfigExtractor configExtractor,
                                    StreamPipesClient streamPipesClient) {
     super(params, configExtractor, streamPipesClient);
@@ -59,25 +60,25 @@ public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingP
   @SuppressWarnings("deprecation")
   public void appendExecutionConfig(DataStream<Event>... convertedStream) {
     DataStream<Map<String, Object>> applicationLogic = getApplicationLogic(convertedStream).flatMap
-            (new EventToMapConverter());
+        (new EventToMapConverter());
 
     EventGrounding outputGrounding = getOutputStream().getEventGrounding();
     SpDataFormatDefinition outputDataFormatDefinition =
-            getDataFormatDefinition(outputGrounding.getTransportFormats().get(0));
+        getDataFormatDefinition(outputGrounding.getTransportFormats().get(0));
 
     ByteArraySerializer serializer =
-            new ByteArraySerializer(outputDataFormatDefinition);
+        new ByteArraySerializer(outputDataFormatDefinition);
     if (isKafkaProtocol(getOutputStream())) {
       applicationLogic
-              .addSink(new FlinkKafkaProducer<>(getTopic(getOutputStream()),
-                      serializer,
-                      getProducerProperties((KafkaTransportProtocol) outputGrounding.getTransportProtocol())));
+          .addSink(new FlinkKafkaProducer<>(getTopic(getOutputStream()),
+              serializer,
+              getProducerProperties((KafkaTransportProtocol) outputGrounding.getTransportProtocol())));
     } else if (isJmsProtocol(getOutputStream())) {
       applicationLogic
-              .addSink(new JmsFlinkProducer(getJmsProtocol(getOutputStream()), serializer));
+          .addSink(new JmsFlinkProducer(getJmsProtocol(getOutputStream()), serializer));
     } else if (isMqttProtocol(getOutputStream())) {
       applicationLogic
-              .addSink(new MqttFlinkProducer(getMqttProtocol(getOutputStream()), serializer));
+          .addSink(new MqttFlinkProducer(getMqttProtocol(getOutputStream()), serializer));
     }
 
   }
@@ -100,9 +101,10 @@ public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingP
   }
 
   @Override
-  protected EventProcessorRuntimeParams<B> makeRuntimeParams(ConfigExtractor configExtractor,
+  protected EventProcessorRuntimeParams<T> makeRuntimeParams(ConfigExtractor configExtractor,
                                                              StreamPipesClient streamPipesClient) {
-    LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by a deployed Flink program due to non-serializable classes.");
+    LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by"
+        + " a deployed Flink program due to non-serializable classes.");
     return new EventProcessorRuntimeParams<>(bindingParams, false, null, null);
   }
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
index 4af5d9190..a9001b6a4 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.wrapper.flink;
 import org.apache.streampipes.wrapper.declarer.EventSinkDeclarer;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
-public abstract class FlinkDataSinkDeclarer<B extends EventSinkBindingParams>
-        extends EventSinkDeclarer<B, FlinkDataSinkRuntime<B>> {
+public abstract class FlinkDataSinkDeclarer<T extends EventSinkBindingParams>
+    extends EventSinkDeclarer<T, FlinkDataSinkRuntime<T>> {
 
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
index d633eec58..e9f062f11 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.wrapper.flink;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -26,16 +25,18 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class FlinkDataSinkRuntime<B extends EventSinkBindingParams> extends
-        FlinkRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
+public abstract class FlinkDataSinkRuntime<T extends EventSinkBindingParams> extends
+    FlinkRuntime<EventSinkRuntimeParams<T>, T, DataSinkInvocation, EventSinkRuntimeContext> {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(FlinkDataSinkRuntime.class);
 
-  public FlinkDataSinkRuntime(B params,
+  public FlinkDataSinkRuntime(T params,
                               ConfigExtractor configExtractor,
                               StreamPipesClient streamPipesClient) {
     super(params, configExtractor, streamPipesClient);
@@ -49,9 +50,11 @@ public abstract class FlinkDataSinkRuntime<B extends EventSinkBindingParams> ext
 
   public abstract void getSink(DataStream<Event>... convertedStream1);
 
-  protected EventSinkRuntimeParams<B> makeRuntimeParams(ConfigExtractor configExtractor,
+  protected EventSinkRuntimeParams<T> makeRuntimeParams(ConfigExtractor configExtractor,
                                                         StreamPipesClient streamPipesClient) {
-    LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by a deployed Flink program due to non-serializable classes.");
+    LOG.warn(
+        "The config extractor and StreamPipes Client can currently not be accessed by"
+            + " a deployed Flink program due to non-serializable classes.");
     return new EventSinkRuntimeParams<>(bindingParams, false, null, null);
   }
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
index 3501c78cf..9ede5d55d 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.wrapper.flink;
 
 public class FlinkMiniClusterDeploymentConfig extends FlinkDeploymentConfig {
 
-    public FlinkMiniClusterDeploymentConfig() {
-        super("", "localhost", 6123, true);
-    }
+  public FlinkMiniClusterDeploymentConfig() {
+    super("", "localhost", 6123, true);
+  }
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
index b1f688385..e25cac0eb 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
@@ -18,25 +18,18 @@
 
 package org.apache.streampipes.wrapper.flink;
 
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.grounding.*;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.RuntimeContext;
 import org.apache.streampipes.wrapper.distributed.runtime.DistributedRuntime;
@@ -48,6 +41,19 @@ import org.apache.streampipes.wrapper.flink.serializer.ByteArrayDeserializer;
 import org.apache.streampipes.wrapper.params.binding.BindingParams;
 import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
 
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
@@ -55,10 +61,10 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.regex.Pattern;
 
-public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends BindingParams<I>, I
-        extends
-        InvocableStreamPipesEntity, RC extends RuntimeContext> extends
-        DistributedRuntime<RP, B, I, RC> implements Runnable, Serializable {
+public abstract class FlinkRuntime<T extends RuntimeParams<V, W, X>, V extends BindingParams<W>, W
+    extends
+        InvocableStreamPipesEntity, X extends RuntimeContext> extends
+    DistributedRuntime<T, V, W, X> implements Runnable, Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -66,7 +72,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
   protected FlinkDeploymentConfig config;
   private StreamExecutionEnvironment env;
 
-  public FlinkRuntime(B bindingParams,
+  public FlinkRuntime(V bindingParams,
                       ConfigExtractor configExtractor,
                       StreamPipesClient streamPipesClient) {
     super(bindingParams, configExtractor, streamPipesClient);
@@ -80,9 +86,9 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
       } else {
         FlinkSpMiniCluster.INSTANCE.start();
         FlinkSpMiniCluster
-                .INSTANCE
-                .getClusterClient()
-                .submitJob(env.getStreamGraph(bindingParams.getGraph().getElementId()).getJobGraph());
+            .INSTANCE
+            .getClusterClient()
+            .submitJob(env.getStreamGraph(bindingParams.getGraph().getElementId()).getJobGraph());
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -135,7 +141,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
   }
 
   private SourceFunction<Map<String, Object>> getJmsConsumer(JmsTransportProtocol protocol,
-                                                SpDataFormatDefinition spDataFormatDefinition) {
+                                                             SpDataFormatDefinition spDataFormatDefinition) {
     return new JmsFlinkConsumer(protocol, spDataFormatDefinition);
   }
 
@@ -148,12 +154,12 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
                                                                SpDataFormatDefinition spDataFormatDefinition) {
     if (protocol.getTopicDefinition() instanceof SimpleTopicDefinition) {
       return new FlinkKafkaConsumer<>(protocol
-              .getTopicDefinition()
-              .getActualTopicName(), new ByteArrayDeserializer(spDataFormatDefinition), getProperties(protocol));
+          .getTopicDefinition()
+          .getActualTopicName(), new ByteArrayDeserializer(spDataFormatDefinition), getProperties(protocol));
     } else {
       String patternTopic = replaceWildcardWithPatternFormat(protocol.getTopicDefinition().getActualTopicName());
       return new FlinkKafkaConsumer<>(Pattern.compile(patternTopic), new ByteArrayDeserializer(spDataFormatDefinition),
-              getProperties(protocol));
+          getProperties(protocol));
     }
   }
 
@@ -163,7 +169,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
       this.env = StreamExecutionEnvironment.createLocalEnvironment();
     } else {
       this.env = StreamExecutionEnvironment
-              .createRemoteEnvironment(config.getHost(), config.getPort(), config.getJarFile());
+          .createRemoteEnvironment(config.getHost(), config.getPort(), config.getJarFile());
     }
 
     appendEnvironmentConfig(this.env);
@@ -188,10 +194,10 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
   private DataStream<Event> addSource(SourceFunction<Map<String, Object>> sourceFunction,
                                       Integer sourceIndex) {
     return env
-            .addSource(sourceFunction)
-            .flatMap(new MapToEventConverter<>(runtimeParams.getSourceInfo(sourceIndex).getSourceId(),
-                    runtimeParams))
-            .flatMap(new StatisticLogger(getGraph()));
+        .addSource(sourceFunction)
+        .flatMap(new MapToEventConverter<>(runtimeParams.getSourceInfo(sourceIndex).getSourceId(),
+            runtimeParams))
+        .flatMap(new StatisticLogger(getGraph()));
   }
 
   @Override
@@ -219,9 +225,9 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
             count++;
             Thread.sleep(1000);
             Optional<JobStatusMessage> statusMessageOpt =
-                    getJobStatus(bindingParams.getGraph().getElementId());
+                getJobStatus(bindingParams.getGraph().getElementId());
             if (statusMessageOpt.isPresent()
-                    && statusMessageOpt.get().getJobState().name().equals("RUNNING")) {
+                && statusMessageOpt.get().getJobState().name().equals("RUNNING")) {
               isDeployed = true;
             }
 
@@ -246,7 +252,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
     try {
       ClusterClient<? extends Comparable<? extends Comparable<?>>> clusterClient = getClusterClient();
       Optional<JobStatusMessage> jobStatusMessage =
-              getJobStatus(bindingParams.getGraph().getElementId());
+          getJobStatus(bindingParams.getGraph().getElementId());
       if (jobStatusMessage.isPresent()) {
         String jobStatusStr = jobStatusMessage.get().getJobState().name();
         // Cancel the job if running
@@ -304,15 +310,15 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
 
       // First, find a job with Running status
       Optional<JobStatusMessage> job = jobsFound.stream()
-              .filter(j -> j.getJobName().equals(jobName) && j.getJobState().name().equals("RUNNING")).findFirst();
+          .filter(j -> j.getJobName().equals(jobName) && j.getJobState().name().equals("RUNNING")).findFirst();
       if (job.isPresent()) {
         return job;
       }
 
       // Otherwise return job with any other status
       return jobs.get()
-              .stream()
-              .filter(j -> j.getJobName().equals(jobName)).findFirst();
+          .stream()
+          .filter(j -> j.getJobName().equals(jobName)).findFirst();
 
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
index 0d1260dfa..1fccbe4a7 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
@@ -36,10 +36,10 @@ public enum FlinkSpMiniCluster {
     Configuration configuration = new Configuration();
     configuration.setString(RestOptions.BIND_PORT, "0");
     MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration
-            .Builder()
-            .setConfiguration(configuration)
-            .setNumTaskManagers(2)
-            .build();
+        .Builder()
+        .setConfiguration(configuration)
+        .setNumTaskManagers(2)
+        .build();
     this.miniCluster = new MiniCluster(miniClusterConfiguration);
   }
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
index 8315e6c95..3706c818a 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
@@ -17,15 +17,16 @@
  */
 package org.apache.streampipes.wrapper.flink.consumer;
 
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Queue;
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
index 81eb2551c..f4a0f77f8 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
@@ -17,12 +17,13 @@
  */
 package org.apache.streampipes.wrapper.flink.consumer;
 
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.mqtt.MqttConsumer;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,8 +39,8 @@ public class MqttFlinkConsumer implements SourceFunction<Map<String, Object>>, S
   private final MqttTransportProtocol protocol;
   private final MqttConsumer mqttConsumer;
   private final SpDataFormatDefinition spDataFormatDefinition;
-  private Boolean isRunning;
   private final Queue<byte[]> queue;
+  private Boolean isRunning;
 
   public MqttFlinkConsumer(MqttTransportProtocol protocol, SpDataFormatDefinition spDataFormatDefinition) {
     this.protocol = protocol;
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
index 0e91c76d0..8d5d4dbb0 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
@@ -17,11 +17,12 @@
  */
 package org.apache.streampipes.wrapper.flink.converter;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.EventConverter;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
 import java.util.Map;
 
 public class EventToMapConverter implements FlatMapFunction<Event, Map<String, Object>> {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
index 5c5b11ab5..2e1e01983 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
@@ -17,23 +17,24 @@
  */
 package org.apache.streampipes.wrapper.flink.converter;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
 import java.util.Map;
 
-public class MapToEventConverter<RP extends RuntimeParams<?, ?, ?>> implements
-        FlatMapFunction<Map<String,
+public class MapToEventConverter<T extends RuntimeParams<?, ?, ?>> implements
+    FlatMapFunction<Map<String,
         Object>, Event> {
 
   private static final long serialVersionUID = 1L;
 
-  private RP runtimeParams;
+  private T runtimeParams;
   private String sourceId;
 
-  public MapToEventConverter(String sourceId, RP runtimeParams) {
+  public MapToEventConverter(String sourceId, T runtimeParams) {
     this.sourceId = sourceId;
     this.runtimeParams = runtimeParams;
   }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
index 79d847fc9..1fc4f1707 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
@@ -18,25 +18,26 @@
 
 package org.apache.streampipes.wrapper.flink.logger;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
 import org.apache.streampipes.logging.impl.EventStatisticLogger;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.runtime.Event;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
 public class StatisticLogger implements FlatMapFunction<Event, Event> {
 
-    private InvocableStreamPipesEntity graph;
+  private InvocableStreamPipesEntity graph;
 
-    public StatisticLogger(InvocableStreamPipesEntity graph) {
-        this.graph = graph;
-    }
+  public StatisticLogger(InvocableStreamPipesEntity graph) {
+    this.graph = graph;
+  }
 
-    @Override
-    public void flatMap(Event in, Collector<Event> out) throws Exception {
-        EventStatisticLogger.log(graph.getName(), graph.getCorrespondingPipeline(), graph.getUri());
-        out.collect(in);
-    }
+  @Override
+  public void flatMap(Event in, Collector<Event> out) throws Exception {
+    EventStatisticLogger.log(graph.getName(), graph.getCorrespondingPipeline(), graph.getUri());
+    out.collect(in);
+  }
 }
 
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
index 82020e0d6..81b195fd4 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
@@ -17,10 +17,11 @@
  */
 package org.apache.streampipes.wrapper.flink.serializer;
 
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
 import java.io.IOException;
 import java.util.Map;
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
index a36373b39..bc5354b11 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
@@ -17,10 +17,11 @@
  */
 package org.apache.streampipes.wrapper.flink.serializer;
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
 import java.util.Map;
 
 public class ByteArraySerializer implements SerializationSchema<Map<String, Object>> {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
index df10f121d..e6834e7de 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
@@ -24,22 +24,22 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import java.util.Map;
 
-public class SimpleJmsSerializer  implements SerializationSchema<Map<String, Object>>{
-
-	private ObjectMapper objectMapper;
-	
-	public SimpleJmsSerializer() {
-		this.objectMapper = new ObjectMapper();
-	}
-	
-	@Override
-	public byte[] serialize(Map<String, Object> payload) {
-		try {
-			return objectMapper.writeValueAsBytes(payload);
-		} catch (JsonProcessingException e) {
-			e.printStackTrace();
-			return null;
-		}
-	}
+public class SimpleJmsSerializer implements SerializationSchema<Map<String, Object>> {
+
+  private ObjectMapper objectMapper;
+
+  public SimpleJmsSerializer() {
+    this.objectMapper = new ObjectMapper();
+  }
+
+  @Override
+  public byte[] serialize(Map<String, Object> payload) {
+    try {
+      return objectMapper.writeValueAsBytes(payload);
+    } catch (JsonProcessingException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
 
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
index 35270bec4..549d46c04 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
@@ -18,12 +18,13 @@
 
 package org.apache.streampipes.wrapper.flink.sink;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.wrapper.flink.serializer.ByteArraySerializer;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
 import java.util.Map;
 
 
@@ -40,7 +41,7 @@ public class JmsFlinkProducer extends RichSinkFunction<Map<String, Object>> {
   private ActiveMQPublisher publisher;
 
   public JmsFlinkProducer(JmsTransportProtocol protocol, ByteArraySerializer
-          serializationSchema) {
+      serializationSchema) {
     this.protocol = protocol;
     this.serializationSchema = serializationSchema;
   }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
index bda60da93..e2b6e4f82 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
@@ -18,12 +18,13 @@
 
 package org.apache.streampipes.wrapper.flink.sink;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.streampipes.messaging.mqtt.MqttPublisher;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.wrapper.flink.serializer.ByteArraySerializer;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
 import java.util.Map;
 
 
@@ -40,7 +41,7 @@ public class MqttFlinkProducer extends RichSinkFunction<Map<String, Object>> {
   private MqttPublisher publisher;
 
   public MqttFlinkProducer(MqttTransportProtocol protocol, ByteArraySerializer
-          serializationSchema) {
+      serializationSchema) {
     this.protocol = protocol;
     this.serializationSchema = serializationSchema;
   }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
index 8eb3c19dd..123dbf1e8 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
@@ -18,9 +18,10 @@
 
 package org.apache.streampipes.wrapper.flink.status;
 
-import com.google.gson.Gson;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 
+import com.google.gson.Gson;
+
 import java.io.Serializable;
 
 public class PipelineElementStatusSender implements Serializable {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
index 76155bed8..df5fd0890 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
@@ -24,19 +24,19 @@ import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
 
 public class PipelineElementStatusSenderFactory {
 
-  public static <I extends InvocableStreamPipesEntity> PipelineElementStatusSender getStatusSender(I graph) {
+  public static <T extends InvocableStreamPipesEntity> PipelineElementStatusSender getStatusSender(T graph) {
 
     SpKafkaProducer kafkaProducer = new SpKafkaProducer();
     // TODO refactor
 
     return new PipelineElementStatusSender(kafkaProducer,
-            graph.getStatusInfoSettings().getErrorTopic(),
-            graph.getStatusInfoSettings().getStatsTopic());
+        graph.getStatusInfoSettings().getErrorTopic(),
+        graph.getStatusInfoSettings().getStatsTopic());
   }
 
-  private static <I extends InvocableStreamPipesEntity> String buildKafkaUrl(I graph) {
+  private static <T extends InvocableStreamPipesEntity> String buildKafkaUrl(T graph) {
 
     ElementStatusInfoSettings settings = graph.getStatusInfoSettings();
-    return settings.getKafkaHost() +":" +settings.getKafkaPort();
+    return settings.getKafkaHost() + ":" + settings.getKafkaPort();
   }
 }


[streampipes] 02/07: add checkstyle to streampipes-wrapper

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 0ae87fd2949c21f8307e2a93a34ec380b069d99c
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 20:47:32 2022 +0100

    add checkstyle to streampipes-wrapper
---
 .../processor/textfilter/TextFilterProcessor.java  |  2 +-
 streampipes-wrapper/pom.xml                        | 81 ++++++++++++----------
 .../context/SpEventProcessorRuntimeContext.java    |  2 +-
 .../wrapper/context/SpEventSinkRuntimeContext.java |  2 +-
 .../wrapper/declarer/EventProcessorDeclarer.java   | 31 +++++----
 .../wrapper/declarer/EventSinkDeclarer.java        |  8 +--
 .../wrapper/declarer/PipelineElementDeclarer.java  | 22 +++---
 .../wrapper/params/binding/BindingParams.java      | 21 +++---
 .../binding/EventProcessorBindingParams.java       | 13 ++--
 .../wrapper/params/binding/InputStreamParams.java  |  6 +-
 .../wrapper/params/binding/OutputStreamParams.java |  4 +-
 .../runtime/EventProcessorRuntimeParams.java       |  6 +-
 .../params/runtime/EventSinkRuntimeParams.java     |  6 +-
 .../wrapper/params/runtime/RuntimeParams.java      | 24 +++----
 .../wrapper/routing/PipelineElementCollector.java  |  4 +-
 .../wrapper/routing/SpInputCollector.java          |  2 +-
 .../wrapper/routing/SpOutputCollector.java         |  2 +-
 .../wrapper/runtime/EventProcessor.java            |  9 +--
 .../streampipes/wrapper/runtime/EventSink.java     |  8 +--
 .../wrapper/runtime/ExternalEventProcessor.java    |  8 +--
 .../wrapper/runtime/ExternalEventSink.java         |  8 +--
 .../wrapper/runtime/PipelineElement.java           |  4 +-
 .../wrapper/runtime/PipelineElementRuntime.java    |  3 +-
 23 files changed, 141 insertions(+), 135 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TextFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TextFilterProcessor.java
index d6897160f..0a9bc049a 100644
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TextFilterProcessor.java
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TextFilterProcessor.java
@@ -72,7 +72,7 @@ public class TextFilterProcessor extends StreamPipesDataProcessor {
         StringOperator.valueOf(processorParams.extractor().selectedSingleValue(OPERATION_ID, String.class));
     this.filterProperty = processorParams.extractor().mappingPropertyValue(MAPPING_PROPERTY_ID);
 
-    logger.info("Text Property: " + filterProperty);
+    LOGGER.info("Text Property: " + filterProperty);
   }
 
   @Override
diff --git a/streampipes-wrapper/pom.xml b/streampipes-wrapper/pom.xml
index 422b6754d..c462ff34e 100644
--- a/streampipes-wrapper/pom.xml
+++ b/streampipes-wrapper/pom.xml
@@ -16,46 +16,55 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.streampipes</groupId>
-		<artifactId>streampipes-parent</artifactId>
-		<version>0.91.0-SNAPSHOT</version>
-	</parent>
-	<artifactId>streampipes-wrapper</artifactId>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampipes</groupId>
+        <artifactId>streampipes-parent</artifactId>
+        <version>0.91.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>streampipes-wrapper</artifactId>
 
-	<dependencies>
-		<!-- StreamPipes dependencies -->
-		<dependency>
-			<groupId>org.apache.streampipes</groupId>
-			<artifactId>streampipes-container</artifactId>
-			<version>0.91.0-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.streampipes</groupId>
-			<artifactId>streampipes-service-extensions-base</artifactId>
-			<version>0.91.0-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.streampipes</groupId>
-			<artifactId>streampipes-client</artifactId>
-			<version>0.91.0-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.streampipes</groupId>
-			<artifactId>streampipes-dataformat</artifactId>
-			<version>0.91.0-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.streampipes</groupId>
-			<artifactId>streampipes-messaging</artifactId>
-			<version>0.91.0-SNAPSHOT</version>
-		</dependency>
+    <dependencies>
+        <!-- StreamPipes dependencies -->
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-container</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-service-extensions-base</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-client</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-dataformat</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-messaging</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-sdk</artifactId>
-			<version>0.91.0-SNAPSHOT</version>
+            <version>0.91.0-SNAPSHOT</version>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventProcessorRuntimeContext.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventProcessorRuntimeContext.java
index a0a2c8cbd..fa0f93001 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventProcessorRuntimeContext.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventProcessorRuntimeContext.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
 import java.util.List;
 
 public class SpEventProcessorRuntimeContext extends SpRuntimeContext implements
-        EventProcessorRuntimeContext, Serializable {
+    EventProcessorRuntimeContext, Serializable {
 
   private SchemaInfo outputSchemaInfo;
   private SourceInfo outputSourceInfo;
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventSinkRuntimeContext.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventSinkRuntimeContext.java
index fc5069462..dbd36209a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventSinkRuntimeContext.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventSinkRuntimeContext.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
 import java.util.List;
 
 public class SpEventSinkRuntimeContext extends SpRuntimeContext implements
-        EventSinkRuntimeContext, Serializable {
+    EventSinkRuntimeContext, Serializable {
 
   public SpEventSinkRuntimeContext(List<SourceInfo> sourceInfo,
                                    List<SchemaInfo> inputSchemaInfo,
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
index a8427daf0..f292013c6 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventProcessorDeclarer.java
@@ -18,8 +18,6 @@
 
 package org.apache.streampipes.wrapper.declarer;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -27,21 +25,24 @@ import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime;
 
-public abstract class EventProcessorDeclarer<B extends EventProcessorBindingParams, EPR extends
-				PipelineElementRuntime> extends PipelineElementDeclarer<B, EPR, DataProcessorInvocation,
-				ProcessingElementParameterExtractor> implements
-				SemanticEventProcessingAgentDeclarer {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class EventProcessorDeclarer<K extends EventProcessorBindingParams, V extends
+    PipelineElementRuntime> extends PipelineElementDeclarer<K, V, DataProcessorInvocation,
+    ProcessingElementParameterExtractor> implements
+    SemanticEventProcessingAgentDeclarer {
 
-	public static final Logger logger = LoggerFactory.getLogger(EventProcessorDeclarer.class.getCanonicalName());
+  public static final Logger LOGGER = LoggerFactory.getLogger(EventProcessorDeclarer.class.getCanonicalName());
 
-	@Override
-	protected ProcessingElementParameterExtractor getExtractor(DataProcessorInvocation graph) {
-		return ProcessingElementParameterExtractor.from(graph);
-	}
+  @Override
+  protected ProcessingElementParameterExtractor getExtractor(DataProcessorInvocation graph) {
+    return ProcessingElementParameterExtractor.from(graph);
+  }
 
-	@Override
-	public Response invokeRuntime(DataProcessorInvocation graph, String serviceId) {
-		return invokeEPRuntime(graph, serviceId);
-	}
+  @Override
+  public Response invokeRuntime(DataProcessorInvocation graph, String serviceId) {
+    return invokeEPRuntime(graph, serviceId);
+  }
 
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventSinkDeclarer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventSinkDeclarer.java
index 03974e01f..a6c4f5e34 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventSinkDeclarer.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/EventSinkDeclarer.java
@@ -25,10 +25,10 @@ import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime;
 
-public abstract class EventSinkDeclarer<B extends EventSinkBindingParams, ES extends
-        PipelineElementRuntime>
-        extends PipelineElementDeclarer<B, ES, DataSinkInvocation,
-        DataSinkParameterExtractor> implements SemanticEventConsumerDeclarer {
+public abstract class EventSinkDeclarer<K extends EventSinkBindingParams, V extends
+    PipelineElementRuntime>
+    extends PipelineElementDeclarer<K, V, DataSinkInvocation,
+    DataSinkParameterExtractor> implements SemanticEventConsumerDeclarer {
 
   @Override
   protected DataSinkParameterExtractor getExtractor(DataSinkInvocation graph) {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/PipelineElementDeclarer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/PipelineElementDeclarer.java
index e45136198..06f4fc162 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/PipelineElementDeclarer.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/PipelineElementDeclarer.java
@@ -23,18 +23,18 @@ import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.sdk.extractor.AbstractParameterExtractor;
+import org.apache.streampipes.service.extensions.base.client.StreamPipesClientResolver;
 import org.apache.streampipes.wrapper.params.binding.BindingParams;
 import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime;
-import org.apache.streampipes.service.extensions.base.client.StreamPipesClientResolver;
 
-public abstract class PipelineElementDeclarer<B extends BindingParams, EPR extends
-        PipelineElementRuntime, I
-        extends InvocableStreamPipesEntity, EX extends AbstractParameterExtractor<I>> {
+public abstract class PipelineElementDeclarer<T extends BindingParams, V extends
+    PipelineElementRuntime, W
+    extends InvocableStreamPipesEntity, X extends AbstractParameterExtractor<W>> {
 
-  protected EPR epRuntime;
+  protected V epRuntime;
   protected String elementId;
 
-  public Response invokeEPRuntime(I graph, String serviceId) {
+  public Response invokeEPRuntime(W graph, String serviceId) {
 
     try {
       elementId = graph.getElementId();
@@ -61,12 +61,12 @@ public abstract class PipelineElementDeclarer<B extends BindingParams, EPR exten
     }
   }
 
-  protected abstract EX getExtractor(I graph);
+  protected abstract X getExtractor(W graph);
 
-  public abstract EPR getRuntime(I graph,
-                                 EX extractor,
-                                 ConfigExtractor configExtractor,
-                                 StreamPipesClient streamPipesClient);
+  public abstract V getRuntime(W graph,
+                               X extractor,
+                               ConfigExtractor configExtractor,
+                               StreamPipesClient streamPipesClient);
 
   private ConfigExtractor makeConfigExtractor(String serviceId) {
     return ConfigExtractor.from(serviceId);
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/BindingParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/BindingParams.java
index e5976fd6d..b751f31bb 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/BindingParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/BindingParams.java
@@ -28,15 +28,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public abstract class BindingParams<I extends InvocableStreamPipesEntity> implements Serializable {
+public abstract class BindingParams<T extends InvocableStreamPipesEntity> implements Serializable {
   private static final long serialVersionUID = 1L;
-
-  protected I graph;
-  private List<InputStreamParams> inputStreamParams = new ArrayList<>();
-
   private final Map<String, Map<String, Object>> inEventTypes;
+  protected T graph;
+  private List<InputStreamParams> inputStreamParams = new ArrayList<>();
 
-  BindingParams(I graph) {
+  BindingParams(T graph) {
     this.graph = graph;
     this.inEventTypes = new HashMap<>();
     buildInEventTypes();
@@ -45,19 +43,20 @@ public abstract class BindingParams<I extends InvocableStreamPipesEntity> implem
 
   private void buildInEventTypes() {
     graph.getInputStreams().forEach(is ->
-            inEventTypes.put(is.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName(), SchemaUtils
-                    .toRuntimeMap
-                            (is.getEventSchema().getEventProperties())));
+        inEventTypes.put(is.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName(),
+            SchemaUtils
+                .toRuntimeMap
+                    (is.getEventSchema().getEventProperties())));
   }
 
   private void buildInputStreamParams() {
     for (int i = 0; i < graph.getInputStreams().size(); i++) {
       inputStreamParams.add(new InputStreamParams(i, graph.getInputStreams().get(i),
-              getRenameRules()));
+          getRenameRules()));
     }
   }
 
-  public I getGraph() {
+  public T getGraph() {
     return graph;
   }
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
index 79971e03a..ce70d01f9 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
@@ -31,20 +31,15 @@ import java.util.stream.Collectors;
 
 @Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class EventProcessorBindingParams extends
-        BindingParams<DataProcessorInvocation> implements
-        Serializable {
+    BindingParams<DataProcessorInvocation> implements
+    Serializable {
 
   private static final long serialVersionUID = 7716492945641719007L;
-
+  private final Map<String, Object> outEventType;
   private SpDataStream outputStream;
   private String outName;
-
-  private final Map<String, Object> outEventType;
   private OutputStreamParams outputStreamParams;
 
-
-  private final static String topicPrefix = "topic://";
-
   public EventProcessorBindingParams(DataProcessorInvocation graph) {
     super(new DataProcessorInvocation(graph));
     this.outEventType = SchemaUtils.toRuntimeMap(graph.getOutputStream().getEventSchema().getEventProperties());
@@ -70,7 +65,7 @@ public abstract class EventProcessorBindingParams extends
   @Override
   public List<PropertyRenameRule> getRenameRules() {
     return graph.getOutputStrategies().stream().flatMap(o -> o.getRenameRules().stream()).collect
-            (Collectors.toList());
+        (Collectors.toList());
   }
 
   public OutputStreamParams getOutputStreamParams() {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/InputStreamParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/InputStreamParams.java
index 6db121aed..a63d58492 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/InputStreamParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/InputStreamParams.java
@@ -41,7 +41,7 @@ public class InputStreamParams implements Serializable {
   private SchemaInfo schemaInfo;
 
   public InputStreamParams(Integer streamId, SpDataStream inputStream, List<PropertyRenameRule>
-          propertyRenameRules) {
+      propertyRenameRules) {
     super();
     this.eventGrounding = inputStream.getEventGrounding();
     this.inName = eventGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName();
@@ -56,12 +56,12 @@ public class InputStreamParams implements Serializable {
 
   private SourceInfo makeSourceInfo(Integer streamId) {
     return new SourceInfo(eventGrounding.getTransportProtocol().getTopicDefinition()
-            .getActualTopicName(), makeStreamPrefix(streamId));
+        .getActualTopicName(), makeStreamPrefix(streamId));
   }
 
   private String makeStreamPrefix(Integer streamId) {
     return streamId == 0 ? PropertySelectorConstants.FIRST_STREAM_ID_PREFIX :
-            PropertySelectorConstants.SECOND_STREAM_ID_PREFIX;
+        PropertySelectorConstants.SECOND_STREAM_ID_PREFIX;
   }
 
   public EventGrounding getEventGrounding() {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/OutputStreamParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/OutputStreamParams.java
index c92a20227..9272c1af5 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/OutputStreamParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/OutputStreamParams.java
@@ -33,7 +33,7 @@ public class OutputStreamParams implements Serializable {
   private SchemaInfo schemaInfo;
 
   public OutputStreamParams(SpDataStream outputStream, List<PropertyRenameRule>
-          propertyRenameRules) {
+      propertyRenameRules) {
     this.schemaInfo = makeSchemaInfo(outputStream.getEventSchema(), propertyRenameRules);
     this.sourceInfo = makeSourceInfo(outputStream.getEventGrounding());
 
@@ -45,7 +45,7 @@ public class OutputStreamParams implements Serializable {
 
   private SourceInfo makeSourceInfo(EventGrounding eventGrounding) {
     return new SourceInfo(eventGrounding.getTransportProtocol().getTopicDefinition()
-            .getActualTopicName(), "o");
+        .getActualTopicName(), "o");
   }
 
   public SourceInfo getSourceInfo() {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventProcessorRuntimeParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventProcessorRuntimeParams.java
index 4dba839bc..b5ebaeeb0 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventProcessorRuntimeParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventProcessorRuntimeParams.java
@@ -28,10 +28,10 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
 
 import java.io.Serializable;
 
-public class EventProcessorRuntimeParams<B extends EventProcessorBindingParams> extends
-    RuntimeParams<B, DataProcessorInvocation, EventProcessorRuntimeContext> implements Serializable {
+public class EventProcessorRuntimeParams<T extends EventProcessorBindingParams> extends
+    RuntimeParams<T, DataProcessorInvocation, EventProcessorRuntimeContext> implements Serializable {
 
-  public EventProcessorRuntimeParams(B bindingParams,
+  public EventProcessorRuntimeParams(T bindingParams,
                                      Boolean singletonEngine,
                                      ConfigExtractor configExtractor,
                                      StreamPipesClient streamPipesClient) {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventSinkRuntimeParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventSinkRuntimeParams.java
index ec8550e64..cc92de847 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventSinkRuntimeParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventSinkRuntimeParams.java
@@ -26,10 +26,10 @@ import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.context.SpEventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
-public class EventSinkRuntimeParams<B extends EventSinkBindingParams> extends
-    RuntimeParams<B, DataSinkInvocation, EventSinkRuntimeContext> {
+public class EventSinkRuntimeParams<T extends EventSinkBindingParams> extends
+    RuntimeParams<T, DataSinkInvocation, EventSinkRuntimeContext> {
 
-  public EventSinkRuntimeParams(B bindingParams,
+  public EventSinkRuntimeParams(T bindingParams,
                                 Boolean singletonEngine,
                                 ConfigExtractor configExtractor,
                                 StreamPipesClient streamPipesClient) {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/RuntimeParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/RuntimeParams.java
index d866a15d7..b5650ef5c 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/RuntimeParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/RuntimeParams.java
@@ -35,11 +35,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public abstract class RuntimeParams<B extends BindingParams<I>, I extends
-        InvocableStreamPipesEntity, RC extends RuntimeContext> implements Serializable {
+public abstract class RuntimeParams<V extends BindingParams<W>, W extends
+    InvocableStreamPipesEntity, X extends RuntimeContext> implements Serializable {
 
-  protected final B bindingParams;
-  protected RC runtimeContext;
+  protected final V bindingParams;
+  protected X runtimeContext;
   protected ConfigExtractor configExtractor;
   protected StreamPipesClient streamPipesClient;
 
@@ -47,7 +47,7 @@ public abstract class RuntimeParams<B extends BindingParams<I>, I extends
 
   private Boolean singletonEngine;
 
-  public RuntimeParams(B bindingParams,
+  public RuntimeParams(V bindingParams,
                        Boolean singletonEngine,
                        ConfigExtractor configExtractor,
                        StreamPipesClient streamPipesClient) {
@@ -59,32 +59,32 @@ public abstract class RuntimeParams<B extends BindingParams<I>, I extends
     this.runtimeContext = makeRuntimeContext();
   }
 
-  public B getBindingParams() {
+  public V getBindingParams() {
     return bindingParams;
   }
 
   private void buildEventInfoMap() {
     for (int i = 0; i < bindingParams.getInputStreamParams().size(); i++) {
       String sourceInfo = bindingParams.getInputStreamParams().get(i).getSourceInfo()
-              .getSourceId();
+          .getSourceId();
       eventInfoMap.put(sourceInfo, i);
     }
   }
 
   public Event makeEvent(Map<String, Object> mapEvent, String sourceId) {
     return EventFactory.fromMap(mapEvent, getSourceInfo(getIndex(sourceId)), getSchemaInfo
-            (getIndex(sourceId)));
+        (getIndex(sourceId)));
 
   }
 
   public List<SourceInfo> getSourceInfo() {
     return bindingParams.getInputStreamParams().size() == 1 ? Collections.singletonList
-            (getSourceInfo(0)) : Arrays.asList(getSourceInfo(0), getSourceInfo(1));
+        (getSourceInfo(0)) : Arrays.asList(getSourceInfo(0), getSourceInfo(1));
   }
 
   public List<SchemaInfo> getSchemaInfo() {
     return bindingParams.getInputStreamParams().size() == 1 ? Collections.singletonList
-            (getSchemaInfo(0)) : Arrays.asList(getSchemaInfo(0), getSchemaInfo(1));
+        (getSchemaInfo(0)) : Arrays.asList(getSchemaInfo(0), getSchemaInfo(1));
   }
 
   public SourceInfo getSourceInfo(Integer index) {
@@ -99,7 +99,7 @@ public abstract class RuntimeParams<B extends BindingParams<I>, I extends
     return eventInfoMap.get(sourceId);
   }
 
-  public RC getRuntimeContext() {
+  public X getRuntimeContext() {
     return runtimeContext;
   }
 
@@ -107,6 +107,6 @@ public abstract class RuntimeParams<B extends BindingParams<I>, I extends
     return singletonEngine;
   }
 
-  protected abstract RC makeRuntimeContext();
+  protected abstract X makeRuntimeContext();
 
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/PipelineElementCollector.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/PipelineElementCollector.java
index 4fcd10480..1095395ce 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/PipelineElementCollector.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/PipelineElementCollector.java
@@ -20,9 +20,9 @@ package org.apache.streampipes.wrapper.routing;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 
-public interface PipelineElementCollector<C> {
+public interface PipelineElementCollector<T> {
 
-  void registerConsumer(String routeId, C consumer);
+  void registerConsumer(String routeId, T consumer);
 
   void unregisterConsumer(String routeId);
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/SpInputCollector.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/SpInputCollector.java
index 48e3cd582..275f3fc97 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/SpInputCollector.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/SpInputCollector.java
@@ -19,6 +19,6 @@
 package org.apache.streampipes.wrapper.routing;
 
 public interface SpInputCollector extends
-        PipelineElementCollector<RawDataProcessor> {
+    PipelineElementCollector<RawDataProcessor> {
 
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/SpOutputCollector.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/SpOutputCollector.java
index b57a5a60c..69ab08476 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/SpOutputCollector.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/routing/SpOutputCollector.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.model.runtime.Event;
 import java.util.Map;
 
 public interface SpOutputCollector extends PipelineElementCollector<InternalEventProcessor<Map<String,
-        Object>>> {
+    Object>>> {
 
   void collect(Event event);
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
index f6538321a..a5388a7a2 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
@@ -26,11 +26,12 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 
 @Deprecated(since = "0.70.0", forRemoval = true)
-public interface EventProcessor<B extends EventProcessorBindingParams> extends
-        PipelineElement<B, DataProcessorInvocation> {
+public interface EventProcessor<T extends EventProcessorBindingParams> extends
+    PipelineElement<T, DataProcessorInvocation> {
 
-  void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws
-          SpRuntimeException;
+  void onInvocation(T parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext)
+      throws
+      SpRuntimeException;
 
   void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException;
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventSink.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventSink.java
index 509d283dc..c6bc8ff50 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventSink.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventSink.java
@@ -24,11 +24,11 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
-public interface EventSink<B extends EventSinkBindingParams> extends PipelineElement<B,
-        DataSinkInvocation> {
+public interface EventSink<T extends EventSinkBindingParams> extends PipelineElement<T,
+    DataSinkInvocation> {
 
-  void onInvocation(B parameters, EventSinkRuntimeContext runtimeContext) throws
-          SpRuntimeException;
+  void onInvocation(T parameters, EventSinkRuntimeContext runtimeContext) throws
+      SpRuntimeException;
 
   void onEvent(Event event) throws SpRuntimeException;
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEventProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEventProcessor.java
index 53cd8028c..48305bc3f 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEventProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEventProcessor.java
@@ -22,10 +22,10 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
-public interface ExternalEventProcessor<B extends EventProcessorBindingParams> extends
-        PipelineElement<B, DataProcessorInvocation>  {
+public interface ExternalEventProcessor<T extends EventProcessorBindingParams> extends
+    PipelineElement<T, DataProcessorInvocation> {
 
-  void onInvocation(B parameters, EventProcessorRuntimeContext runtimeContext) throws
-          SpRuntimeException;
+  void onInvocation(T parameters, EventProcessorRuntimeContext runtimeContext) throws
+      SpRuntimeException;
 
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEventSink.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEventSink.java
index 1624098c7..426bf1c61 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEventSink.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEventSink.java
@@ -22,9 +22,9 @@ import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
-public interface ExternalEventSink<B extends EventSinkBindingParams> extends PipelineElement<B,
-        DataSinkInvocation> {
+public interface ExternalEventSink<T extends EventSinkBindingParams> extends PipelineElement<T,
+    DataSinkInvocation> {
 
-  void onInvocation(B parameters, EventSinkRuntimeContext runtimeContext) throws
-          SpRuntimeException;
+  void onInvocation(T parameters, EventSinkRuntimeContext runtimeContext) throws
+      SpRuntimeException;
 }
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElement.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElement.java
index db2604736..4041fef36 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElement.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElement.java
@@ -22,8 +22,8 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.wrapper.params.binding.BindingParams;
 
-public interface PipelineElement<B extends BindingParams<I>, I extends
-        InvocableStreamPipesEntity> {
+public interface PipelineElement<K extends BindingParams<V>, V extends
+    InvocableStreamPipesEntity> {
 
   void onDetach() throws SpRuntimeException;
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElementRuntime.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElementRuntime.java
index 84577d977..c908531c1 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElementRuntime.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElementRuntime.java
@@ -18,9 +18,10 @@
 
 package org.apache.streampipes.wrapper.runtime;
 
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 
+import org.apache.commons.lang3.RandomStringUtils;
+
 public abstract class PipelineElementRuntime {
 
   protected String instanceId;