You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/22 20:09:18 UTC

[streampipes] 06/07: add checkstyle to streampipes-wrapper-kafka-streams

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 17622e6afe4b69ba0a588c0f5e2e689ed873cd76
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:07:48 2022 +0100

    add checkstyle to streampipes-wrapper-kafka-streams
---
 streampipes-wrapper-kafka-streams/pom.xml          | 11 ++++++++++-
 .../kafka/KafkaStreamsDataProcessorDeclarer.java   |  2 +-
 .../kafka/KafkaStreamsDataProcessorRuntime.java    | 23 +++++++++++-----------
 .../kafka/KafkaStreamsDataSinkDeclarer.java        |  4 ++--
 .../wrapper/kafka/KafkaStreamsDataSinkRuntime.java |  2 +-
 .../wrapper/kafka/KafkaStreamsRuntime.java         | 21 ++++++++++----------
 .../wrapper/kafka/converter/JsonToMapFormat.java   |  7 ++++---
 7 files changed, 41 insertions(+), 29 deletions(-)

diff --git a/streampipes-wrapper-kafka-streams/pom.xml b/streampipes-wrapper-kafka-streams/pom.xml
index 1362a7770..bf2d19657 100644
--- a/streampipes-wrapper-kafka-streams/pom.xml
+++ b/streampipes-wrapper-kafka-streams/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>streampipes-parent</artifactId>
         <groupId>org.apache.streampipes</groupId>
@@ -59,4 +60,12 @@
             </exclusions>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
index 633f20039..36b33d975 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
@@ -21,7 +21,7 @@ import org.apache.streampipes.wrapper.declarer.EventProcessorDeclarer;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class KafkaStreamsDataProcessorDeclarer<B extends
-        EventProcessorBindingParams> extends EventProcessorDeclarer<B, KafkaStreamsDataProcessorRuntime> {
+    EventProcessorBindingParams> extends EventProcessorDeclarer<B, KafkaStreamsDataProcessorRuntime> {
 
 
 }
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
index 79cbbdd29..02cd53799 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
@@ -17,10 +17,6 @@
  */
 package org.apache.streampipes.wrapper.kafka;
 
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -32,13 +28,18 @@ import org.apache.streampipes.wrapper.kafka.converter.MapToJsonFormat;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
 
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
 import java.util.Map;
 import java.util.regex.Pattern;
 
 public abstract class KafkaStreamsDataProcessorRuntime<B extends
-        EventProcessorBindingParams>
-        extends KafkaStreamsRuntime<EventProcessorRuntimeParams<B>, B,
-        DataProcessorInvocation, EventProcessorRuntimeContext> {
+    EventProcessorBindingParams>
+    extends KafkaStreamsRuntime<EventProcessorRuntimeParams<B>, B,
+    DataProcessorInvocation, EventProcessorRuntimeContext> {
 
 
   public KafkaStreamsDataProcessorRuntime(EventProcessorRuntimeParams<B> runtimeParams) {
@@ -53,7 +54,7 @@ public abstract class KafkaStreamsDataProcessorRuntime<B extends
       StreamsBuilder builder = new StreamsBuilder();
       SpDataStream inputStream = runtimeParams.getBindingParams().getGraph().getInputStreams().get(0);
       TransportProtocol protocol = protocol(inputStream);
-      KStream<String, String> stream ;
+      KStream<String, String> stream;
 
       if (protocol.getTopicDefinition() instanceof SimpleTopicDefinition) {
         stream = builder.stream(getTopic(inputStream));
@@ -62,12 +63,12 @@ public abstract class KafkaStreamsDataProcessorRuntime<B extends
       }
 
       KStream<String, Map<String, Object>> mapFormat = stream.flatMapValues((ValueMapper<String, Iterable<Map<String,
-              Object>>>) s -> new JsonToMapFormat(getGraph()).apply(s));
+          Object>>>) s -> new JsonToMapFormat(getGraph()).apply(s));
 
       KStream<String, String> outStream = getApplicationLogic(mapFormat).flatMapValues(new
-              MapToJsonFormat());
+          MapToJsonFormat());
       outStream.to(getTopic(runtimeParams.getBindingParams().getGraph()
-              .getOutputStream()));
+          .getOutputStream()));
       streams = new KafkaStreams(builder.build(), config);
 
       streams.start();
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
index 36dfa4e8d..349f6b556 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
@@ -20,6 +20,6 @@ package org.apache.streampipes.wrapper.kafka;
 import org.apache.streampipes.wrapper.declarer.EventSinkDeclarer;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
-public abstract class KafkaStreamsDataSinkDeclarer <B extends
-        EventSinkBindingParams> extends EventSinkDeclarer<B, KafkaStreamsDataSinkRuntime> {
+public abstract class KafkaStreamsDataSinkDeclarer<B extends
+    EventSinkBindingParams> extends EventSinkDeclarer<B, KafkaStreamsDataSinkRuntime> {
 }
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
index 9e9b8dfa2..56270602e 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
 
 public abstract class KafkaStreamsDataSinkRuntime<B extends EventSinkBindingParams>
-        extends KafkaStreamsRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
+    extends KafkaStreamsRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
 
 
   public KafkaStreamsDataSinkRuntime(EventSinkRuntimeParams<B> runtimeParams) {
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
index cdb8d665a..cc4bd5c63 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
@@ -17,10 +17,6 @@
  */
 package org.apache.streampipes.wrapper.kafka;
 
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.wrapper.context.RuntimeContext;
@@ -28,12 +24,17 @@ import org.apache.streampipes.wrapper.distributed.runtime.DistributedRuntime;
 import org.apache.streampipes.wrapper.params.binding.BindingParams;
 import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+
 import java.util.Map;
 import java.util.Properties;
 
 public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B extends
-        BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext> extends
-        DistributedRuntime<RP, B, I, RC> {
+    BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext> extends
+    DistributedRuntime<RP, B, I, RC> {
 
   Properties config;
   KafkaStreams streams;
@@ -46,10 +47,10 @@ public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B
   public void prepareRuntime() throws SpRuntimeException {
     config = new Properties();
     config.put(StreamsConfig.APPLICATION_ID_CONFIG, gneerateApplicationId(runtimeParams.getBindingParams()
-            .getGraph()
-            .getElementId()));
+        .getGraph()
+        .getElementId()));
     config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaUrl(runtimeParams.getBindingParams().getGraph()
-            .getInputStreams().get(0)));
+        .getInputStreams().get(0)));
     config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
     config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
   }
@@ -69,6 +70,6 @@ public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B
   }
 
   protected abstract KStream<String, Map<String, Object>> getApplicationLogic(KStream<String, Map<String, Object>>...
-                                                                           inputStreams);
+                                                                                  inputStreams);
 
 }
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
index 7173d7c40..9a077d313 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
@@ -17,18 +17,19 @@
  */
 package org.apache.streampipes.wrapper.kafka.converter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.streampipes.logging.impl.EventStatisticLogger;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 public class JsonToMapFormat implements ValueMapper<String, Iterable<Map<String,
-        Object>>> {
+    Object>>> {
 
   private ObjectMapper mapper;
   private InvocableStreamPipesEntity graph;