You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/22 20:09:17 UTC

[streampipes] 05/07: add checkstyle to streampipes-wrapper-flink

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 1e73b24fd1efd659822289bd7dfc0bb07d627c1c
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:06:56 2022 +0100

    add checkstyle to streampipes-wrapper-flink
---
 streampipes-wrapper-flink/pom.xml                  | 11 ++-
 .../wrapper/flink/FlinkDataProcessorDeclarer.java  |  4 +-
 .../wrapper/flink/FlinkDataProcessorRuntime.java   | 32 ++++-----
 .../wrapper/flink/FlinkDataSinkDeclarer.java       |  4 +-
 .../wrapper/flink/FlinkDataSinkRuntime.java        | 15 +++--
 .../flink/FlinkMiniClusterDeploymentConfig.java    |  6 +-
 .../streampipes/wrapper/flink/FlinkRuntime.java    | 78 ++++++++++++----------
 .../wrapper/flink/FlinkSpMiniCluster.java          |  8 +--
 .../wrapper/flink/consumer/JmsFlinkConsumer.java   |  7 +-
 .../wrapper/flink/consumer/MqttFlinkConsumer.java  |  5 +-
 .../flink/converter/EventToMapConverter.java       |  5 +-
 .../flink/converter/MapToEventConverter.java       | 13 ++--
 .../wrapper/flink/logger/StatisticLogger.java      | 23 ++++---
 .../flink/serializer/ByteArrayDeserializer.java    |  3 +-
 .../flink/serializer/ByteArraySerializer.java      |  3 +-
 .../flink/serializer/SimpleJmsSerializer.java      | 34 +++++-----
 .../wrapper/flink/sink/JmsFlinkProducer.java       |  7 +-
 .../wrapper/flink/sink/MqttFlinkProducer.java      |  7 +-
 .../flink/status/PipelineElementStatusSender.java  |  3 +-
 .../status/PipelineElementStatusSenderFactory.java | 10 +--
 20 files changed, 154 insertions(+), 124 deletions(-)

diff --git a/streampipes-wrapper-flink/pom.xml b/streampipes-wrapper-flink/pom.xml
index 6208b71b8..170f966cb 100644
--- a/streampipes-wrapper-flink/pom.xml
+++ b/streampipes-wrapper-flink/pom.xml
@@ -16,7 +16,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.streampipes</groupId>
@@ -111,4 +112,12 @@
             <artifactId>snappy-java</artifactId>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
index 72405b983..d6bd199d2 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
@@ -21,8 +21,8 @@ package org.apache.streampipes.wrapper.flink;
 import org.apache.streampipes.wrapper.declarer.EventProcessorDeclarer;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
-public abstract class FlinkDataProcessorDeclarer<B extends EventProcessorBindingParams>
-        extends EventProcessorDeclarer<B, FlinkDataProcessorRuntime<B>> {
+public abstract class FlinkDataProcessorDeclarer<T extends EventProcessorBindingParams>
+    extends EventProcessorDeclarer<T, FlinkDataProcessorRuntime<T>> {
 
 
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
index 23baa1cdf..f1269507f 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
@@ -18,8 +18,6 @@
 
 package org.apache.streampipes.wrapper.flink;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
@@ -35,6 +33,9 @@ import org.apache.streampipes.wrapper.flink.sink.JmsFlinkProducer;
 import org.apache.streampipes.wrapper.flink.sink.MqttFlinkProducer;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,15 +43,15 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
-public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingParams> extends
-        FlinkRuntime<EventProcessorRuntimeParams<B>, B,
+public abstract class FlinkDataProcessorRuntime<T extends EventProcessorBindingParams> extends
+    FlinkRuntime<EventProcessorRuntimeParams<T>, T,
         DataProcessorInvocation, EventProcessorRuntimeContext> {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(FlinkDataProcessorRuntime.class);
 
 
-  public FlinkDataProcessorRuntime(B params,
+  public FlinkDataProcessorRuntime(T params,
                                    ConfigExtractor configExtractor,
                                    StreamPipesClient streamPipesClient) {
     super(params, configExtractor, streamPipesClient);
@@ -59,25 +60,25 @@ public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingP
   @SuppressWarnings("deprecation")
   public void appendExecutionConfig(DataStream<Event>... convertedStream) {
     DataStream<Map<String, Object>> applicationLogic = getApplicationLogic(convertedStream).flatMap
-            (new EventToMapConverter());
+        (new EventToMapConverter());
 
     EventGrounding outputGrounding = getOutputStream().getEventGrounding();
     SpDataFormatDefinition outputDataFormatDefinition =
-            getDataFormatDefinition(outputGrounding.getTransportFormats().get(0));
+        getDataFormatDefinition(outputGrounding.getTransportFormats().get(0));
 
     ByteArraySerializer serializer =
-            new ByteArraySerializer(outputDataFormatDefinition);
+        new ByteArraySerializer(outputDataFormatDefinition);
     if (isKafkaProtocol(getOutputStream())) {
       applicationLogic
-              .addSink(new FlinkKafkaProducer<>(getTopic(getOutputStream()),
-                      serializer,
-                      getProducerProperties((KafkaTransportProtocol) outputGrounding.getTransportProtocol())));
+          .addSink(new FlinkKafkaProducer<>(getTopic(getOutputStream()),
+              serializer,
+              getProducerProperties((KafkaTransportProtocol) outputGrounding.getTransportProtocol())));
     } else if (isJmsProtocol(getOutputStream())) {
       applicationLogic
-              .addSink(new JmsFlinkProducer(getJmsProtocol(getOutputStream()), serializer));
+          .addSink(new JmsFlinkProducer(getJmsProtocol(getOutputStream()), serializer));
     } else if (isMqttProtocol(getOutputStream())) {
       applicationLogic
-              .addSink(new MqttFlinkProducer(getMqttProtocol(getOutputStream()), serializer));
+          .addSink(new MqttFlinkProducer(getMqttProtocol(getOutputStream()), serializer));
     }
 
   }
@@ -100,9 +101,10 @@ public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingP
   }
 
   @Override
-  protected EventProcessorRuntimeParams<B> makeRuntimeParams(ConfigExtractor configExtractor,
+  protected EventProcessorRuntimeParams<T> makeRuntimeParams(ConfigExtractor configExtractor,
                                                              StreamPipesClient streamPipesClient) {
-    LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by a deployed Flink program due to non-serializable classes.");
+    LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by"
+        + " a deployed Flink program due to non-serializable classes.");
     return new EventProcessorRuntimeParams<>(bindingParams, false, null, null);
   }
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
index 4af5d9190..a9001b6a4 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.wrapper.flink;
 import org.apache.streampipes.wrapper.declarer.EventSinkDeclarer;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
-public abstract class FlinkDataSinkDeclarer<B extends EventSinkBindingParams>
-        extends EventSinkDeclarer<B, FlinkDataSinkRuntime<B>> {
+public abstract class FlinkDataSinkDeclarer<T extends EventSinkBindingParams>
+    extends EventSinkDeclarer<T, FlinkDataSinkRuntime<T>> {
 
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
index d633eec58..e9f062f11 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.wrapper.flink;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -26,16 +25,18 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class FlinkDataSinkRuntime<B extends EventSinkBindingParams> extends
-        FlinkRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
+public abstract class FlinkDataSinkRuntime<T extends EventSinkBindingParams> extends
+    FlinkRuntime<EventSinkRuntimeParams<T>, T, DataSinkInvocation, EventSinkRuntimeContext> {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(FlinkDataSinkRuntime.class);
 
-  public FlinkDataSinkRuntime(B params,
+  public FlinkDataSinkRuntime(T params,
                               ConfigExtractor configExtractor,
                               StreamPipesClient streamPipesClient) {
     super(params, configExtractor, streamPipesClient);
@@ -49,9 +50,11 @@ public abstract class FlinkDataSinkRuntime<B extends EventSinkBindingParams> ext
 
   public abstract void getSink(DataStream<Event>... convertedStream1);
 
-  protected EventSinkRuntimeParams<B> makeRuntimeParams(ConfigExtractor configExtractor,
+  protected EventSinkRuntimeParams<T> makeRuntimeParams(ConfigExtractor configExtractor,
                                                         StreamPipesClient streamPipesClient) {
-    LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by a deployed Flink program due to non-serializable classes.");
+    LOG.warn(
+        "The config extractor and StreamPipes Client can currently not be accessed by"
+            + " a deployed Flink program due to non-serializable classes.");
     return new EventSinkRuntimeParams<>(bindingParams, false, null, null);
   }
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
index 3501c78cf..9ede5d55d 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.wrapper.flink;
 
 public class FlinkMiniClusterDeploymentConfig extends FlinkDeploymentConfig {
 
-    public FlinkMiniClusterDeploymentConfig() {
-        super("", "localhost", 6123, true);
-    }
+  public FlinkMiniClusterDeploymentConfig() {
+    super("", "localhost", 6123, true);
+  }
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
index b1f688385..e25cac0eb 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
@@ -18,25 +18,18 @@
 
 package org.apache.streampipes.wrapper.flink;
 
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.grounding.*;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.RuntimeContext;
 import org.apache.streampipes.wrapper.distributed.runtime.DistributedRuntime;
@@ -48,6 +41,19 @@ import org.apache.streampipes.wrapper.flink.serializer.ByteArrayDeserializer;
 import org.apache.streampipes.wrapper.params.binding.BindingParams;
 import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
 
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
@@ -55,10 +61,10 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.regex.Pattern;
 
-public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends BindingParams<I>, I
-        extends
-        InvocableStreamPipesEntity, RC extends RuntimeContext> extends
-        DistributedRuntime<RP, B, I, RC> implements Runnable, Serializable {
+public abstract class FlinkRuntime<T extends RuntimeParams<V, W, X>, V extends BindingParams<W>, W
+    extends
+        InvocableStreamPipesEntity, X extends RuntimeContext> extends
+    DistributedRuntime<T, V, W, X> implements Runnable, Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -66,7 +72,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
   protected FlinkDeploymentConfig config;
   private StreamExecutionEnvironment env;
 
-  public FlinkRuntime(B bindingParams,
+  public FlinkRuntime(V bindingParams,
                       ConfigExtractor configExtractor,
                       StreamPipesClient streamPipesClient) {
     super(bindingParams, configExtractor, streamPipesClient);
@@ -80,9 +86,9 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
       } else {
         FlinkSpMiniCluster.INSTANCE.start();
         FlinkSpMiniCluster
-                .INSTANCE
-                .getClusterClient()
-                .submitJob(env.getStreamGraph(bindingParams.getGraph().getElementId()).getJobGraph());
+            .INSTANCE
+            .getClusterClient()
+            .submitJob(env.getStreamGraph(bindingParams.getGraph().getElementId()).getJobGraph());
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -135,7 +141,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
   }
 
   private SourceFunction<Map<String, Object>> getJmsConsumer(JmsTransportProtocol protocol,
-                                                SpDataFormatDefinition spDataFormatDefinition) {
+                                                             SpDataFormatDefinition spDataFormatDefinition) {
     return new JmsFlinkConsumer(protocol, spDataFormatDefinition);
   }
 
@@ -148,12 +154,12 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
                                                                SpDataFormatDefinition spDataFormatDefinition) {
     if (protocol.getTopicDefinition() instanceof SimpleTopicDefinition) {
       return new FlinkKafkaConsumer<>(protocol
-              .getTopicDefinition()
-              .getActualTopicName(), new ByteArrayDeserializer(spDataFormatDefinition), getProperties(protocol));
+          .getTopicDefinition()
+          .getActualTopicName(), new ByteArrayDeserializer(spDataFormatDefinition), getProperties(protocol));
     } else {
       String patternTopic = replaceWildcardWithPatternFormat(protocol.getTopicDefinition().getActualTopicName());
       return new FlinkKafkaConsumer<>(Pattern.compile(patternTopic), new ByteArrayDeserializer(spDataFormatDefinition),
-              getProperties(protocol));
+          getProperties(protocol));
     }
   }
 
@@ -163,7 +169,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
       this.env = StreamExecutionEnvironment.createLocalEnvironment();
     } else {
       this.env = StreamExecutionEnvironment
-              .createRemoteEnvironment(config.getHost(), config.getPort(), config.getJarFile());
+          .createRemoteEnvironment(config.getHost(), config.getPort(), config.getJarFile());
     }
 
     appendEnvironmentConfig(this.env);
@@ -188,10 +194,10 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
   private DataStream<Event> addSource(SourceFunction<Map<String, Object>> sourceFunction,
                                       Integer sourceIndex) {
     return env
-            .addSource(sourceFunction)
-            .flatMap(new MapToEventConverter<>(runtimeParams.getSourceInfo(sourceIndex).getSourceId(),
-                    runtimeParams))
-            .flatMap(new StatisticLogger(getGraph()));
+        .addSource(sourceFunction)
+        .flatMap(new MapToEventConverter<>(runtimeParams.getSourceInfo(sourceIndex).getSourceId(),
+            runtimeParams))
+        .flatMap(new StatisticLogger(getGraph()));
   }
 
   @Override
@@ -219,9 +225,9 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
             count++;
             Thread.sleep(1000);
             Optional<JobStatusMessage> statusMessageOpt =
-                    getJobStatus(bindingParams.getGraph().getElementId());
+                getJobStatus(bindingParams.getGraph().getElementId());
             if (statusMessageOpt.isPresent()
-                    && statusMessageOpt.get().getJobState().name().equals("RUNNING")) {
+                && statusMessageOpt.get().getJobState().name().equals("RUNNING")) {
               isDeployed = true;
             }
 
@@ -246,7 +252,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
     try {
       ClusterClient<? extends Comparable<? extends Comparable<?>>> clusterClient = getClusterClient();
       Optional<JobStatusMessage> jobStatusMessage =
-              getJobStatus(bindingParams.getGraph().getElementId());
+          getJobStatus(bindingParams.getGraph().getElementId());
       if (jobStatusMessage.isPresent()) {
         String jobStatusStr = jobStatusMessage.get().getJobState().name();
         // Cancel the job if running
@@ -304,15 +310,15 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
 
       // First, find a job with Running status
       Optional<JobStatusMessage> job = jobsFound.stream()
-              .filter(j -> j.getJobName().equals(jobName) && j.getJobState().name().equals("RUNNING")).findFirst();
+          .filter(j -> j.getJobName().equals(jobName) && j.getJobState().name().equals("RUNNING")).findFirst();
       if (job.isPresent()) {
         return job;
       }
 
       // Otherwise return job with any other status
       return jobs.get()
-              .stream()
-              .filter(j -> j.getJobName().equals(jobName)).findFirst();
+          .stream()
+          .filter(j -> j.getJobName().equals(jobName)).findFirst();
 
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
index 0d1260dfa..1fccbe4a7 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
@@ -36,10 +36,10 @@ public enum FlinkSpMiniCluster {
     Configuration configuration = new Configuration();
     configuration.setString(RestOptions.BIND_PORT, "0");
     MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration
-            .Builder()
-            .setConfiguration(configuration)
-            .setNumTaskManagers(2)
-            .build();
+        .Builder()
+        .setConfiguration(configuration)
+        .setNumTaskManagers(2)
+        .build();
     this.miniCluster = new MiniCluster(miniClusterConfiguration);
   }
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
index 8315e6c95..3706c818a 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
@@ -17,15 +17,16 @@
  */
 package org.apache.streampipes.wrapper.flink.consumer;
 
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Queue;
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
index 81eb2551c..f4a0f77f8 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
@@ -17,12 +17,13 @@
  */
 package org.apache.streampipes.wrapper.flink.consumer;
 
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.mqtt.MqttConsumer;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,8 +39,8 @@ public class MqttFlinkConsumer implements SourceFunction<Map<String, Object>>, S
   private final MqttTransportProtocol protocol;
   private final MqttConsumer mqttConsumer;
   private final SpDataFormatDefinition spDataFormatDefinition;
-  private Boolean isRunning;
   private final Queue<byte[]> queue;
+  private Boolean isRunning;
 
   public MqttFlinkConsumer(MqttTransportProtocol protocol, SpDataFormatDefinition spDataFormatDefinition) {
     this.protocol = protocol;
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
index 0e91c76d0..8d5d4dbb0 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
@@ -17,11 +17,12 @@
  */
 package org.apache.streampipes.wrapper.flink.converter;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.EventConverter;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
 import java.util.Map;
 
 public class EventToMapConverter implements FlatMapFunction<Event, Map<String, Object>> {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
index 5c5b11ab5..2e1e01983 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
@@ -17,23 +17,24 @@
  */
 package org.apache.streampipes.wrapper.flink.converter;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
 import java.util.Map;
 
-public class MapToEventConverter<RP extends RuntimeParams<?, ?, ?>> implements
-        FlatMapFunction<Map<String,
+public class MapToEventConverter<T extends RuntimeParams<?, ?, ?>> implements
+    FlatMapFunction<Map<String,
         Object>, Event> {
 
   private static final long serialVersionUID = 1L;
 
-  private RP runtimeParams;
+  private T runtimeParams;
   private String sourceId;
 
-  public MapToEventConverter(String sourceId, RP runtimeParams) {
+  public MapToEventConverter(String sourceId, T runtimeParams) {
     this.sourceId = sourceId;
     this.runtimeParams = runtimeParams;
   }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
index 79d847fc9..1fc4f1707 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
@@ -18,25 +18,26 @@
 
 package org.apache.streampipes.wrapper.flink.logger;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
 import org.apache.streampipes.logging.impl.EventStatisticLogger;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.runtime.Event;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
 public class StatisticLogger implements FlatMapFunction<Event, Event> {
 
-    private InvocableStreamPipesEntity graph;
+  private InvocableStreamPipesEntity graph;
 
-    public StatisticLogger(InvocableStreamPipesEntity graph) {
-        this.graph = graph;
-    }
+  public StatisticLogger(InvocableStreamPipesEntity graph) {
+    this.graph = graph;
+  }
 
-    @Override
-    public void flatMap(Event in, Collector<Event> out) throws Exception {
-        EventStatisticLogger.log(graph.getName(), graph.getCorrespondingPipeline(), graph.getUri());
-        out.collect(in);
-    }
+  @Override
+  public void flatMap(Event in, Collector<Event> out) throws Exception {
+    EventStatisticLogger.log(graph.getName(), graph.getCorrespondingPipeline(), graph.getUri());
+    out.collect(in);
+  }
 }
 
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
index 82020e0d6..81b195fd4 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
@@ -17,10 +17,11 @@
  */
 package org.apache.streampipes.wrapper.flink.serializer;
 
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
 import java.io.IOException;
 import java.util.Map;
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
index a36373b39..bc5354b11 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
@@ -17,10 +17,11 @@
  */
 package org.apache.streampipes.wrapper.flink.serializer;
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
 import java.util.Map;
 
 public class ByteArraySerializer implements SerializationSchema<Map<String, Object>> {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
index df10f121d..e6834e7de 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
@@ -24,22 +24,22 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import java.util.Map;
 
-public class SimpleJmsSerializer  implements SerializationSchema<Map<String, Object>>{
-
-	private ObjectMapper objectMapper;
-	
-	public SimpleJmsSerializer() {
-		this.objectMapper = new ObjectMapper();
-	}
-	
-	@Override
-	public byte[] serialize(Map<String, Object> payload) {
-		try {
-			return objectMapper.writeValueAsBytes(payload);
-		} catch (JsonProcessingException e) {
-			e.printStackTrace();
-			return null;
-		}
-	}
+public class SimpleJmsSerializer implements SerializationSchema<Map<String, Object>> {
+
+  private ObjectMapper objectMapper;
+
+  public SimpleJmsSerializer() {
+    this.objectMapper = new ObjectMapper();
+  }
+
+  @Override
+  public byte[] serialize(Map<String, Object> payload) {
+    try {
+      return objectMapper.writeValueAsBytes(payload);
+    } catch (JsonProcessingException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
 
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
index 35270bec4..549d46c04 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
@@ -18,12 +18,13 @@
 
 package org.apache.streampipes.wrapper.flink.sink;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.wrapper.flink.serializer.ByteArraySerializer;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
 import java.util.Map;
 
 
@@ -40,7 +41,7 @@ public class JmsFlinkProducer extends RichSinkFunction<Map<String, Object>> {
   private ActiveMQPublisher publisher;
 
   public JmsFlinkProducer(JmsTransportProtocol protocol, ByteArraySerializer
-          serializationSchema) {
+      serializationSchema) {
     this.protocol = protocol;
     this.serializationSchema = serializationSchema;
   }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
index bda60da93..e2b6e4f82 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
@@ -18,12 +18,13 @@
 
 package org.apache.streampipes.wrapper.flink.sink;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.streampipes.messaging.mqtt.MqttPublisher;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.wrapper.flink.serializer.ByteArraySerializer;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
 import java.util.Map;
 
 
@@ -40,7 +41,7 @@ public class MqttFlinkProducer extends RichSinkFunction<Map<String, Object>> {
   private MqttPublisher publisher;
 
   public MqttFlinkProducer(MqttTransportProtocol protocol, ByteArraySerializer
-          serializationSchema) {
+      serializationSchema) {
     this.protocol = protocol;
     this.serializationSchema = serializationSchema;
   }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
index 8eb3c19dd..123dbf1e8 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
@@ -18,9 +18,10 @@
 
 package org.apache.streampipes.wrapper.flink.status;
 
-import com.google.gson.Gson;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 
+import com.google.gson.Gson;
+
 import java.io.Serializable;
 
 public class PipelineElementStatusSender implements Serializable {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
index 76155bed8..df5fd0890 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
@@ -24,19 +24,19 @@ import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
 
 public class PipelineElementStatusSenderFactory {
 
-  public static <I extends InvocableStreamPipesEntity> PipelineElementStatusSender getStatusSender(I graph) {
+  public static <T extends InvocableStreamPipesEntity> PipelineElementStatusSender getStatusSender(T graph) {
 
     SpKafkaProducer kafkaProducer = new SpKafkaProducer();
     // TODO refactor
 
     return new PipelineElementStatusSender(kafkaProducer,
-            graph.getStatusInfoSettings().getErrorTopic(),
-            graph.getStatusInfoSettings().getStatsTopic());
+        graph.getStatusInfoSettings().getErrorTopic(),
+        graph.getStatusInfoSettings().getStatsTopic());
   }
 
-  private static <I extends InvocableStreamPipesEntity> String buildKafkaUrl(I graph) {
+  private static <T extends InvocableStreamPipesEntity> String buildKafkaUrl(T graph) {
 
     ElementStatusInfoSettings settings = graph.getStatusInfoSettings();
-    return settings.getKafkaHost() +":" +settings.getKafkaPort();
+    return settings.getKafkaHost() + ":" + settings.getKafkaPort();
   }
 }