You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/22 20:09:18 UTC
[streampipes] 06/07: add checkstyle to streampipes-wrapper-kafka-streams
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 17622e6afe4b69ba0a588c0f5e2e689ed873cd76
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:07:48 2022 +0100
add checkstyle to streampipes-wrapper-kafka-streams
---
streampipes-wrapper-kafka-streams/pom.xml | 11 ++++++++++-
.../kafka/KafkaStreamsDataProcessorDeclarer.java | 2 +-
.../kafka/KafkaStreamsDataProcessorRuntime.java | 23 +++++++++++-----------
.../kafka/KafkaStreamsDataSinkDeclarer.java | 4 ++--
.../wrapper/kafka/KafkaStreamsDataSinkRuntime.java | 2 +-
.../wrapper/kafka/KafkaStreamsRuntime.java | 21 ++++++++++----------
.../wrapper/kafka/converter/JsonToMapFormat.java | 7 ++++---
7 files changed, 41 insertions(+), 29 deletions(-)
diff --git a/streampipes-wrapper-kafka-streams/pom.xml b/streampipes-wrapper-kafka-streams/pom.xml
index 1362a7770..bf2d19657 100644
--- a/streampipes-wrapper-kafka-streams/pom.xml
+++ b/streampipes-wrapper-kafka-streams/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streampipes-parent</artifactId>
<groupId>org.apache.streampipes</groupId>
@@ -59,4 +60,12 @@
</exclusions>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
index 633f20039..36b33d975 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorDeclarer.java
@@ -21,7 +21,7 @@ import org.apache.streampipes.wrapper.declarer.EventProcessorDeclarer;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
public abstract class KafkaStreamsDataProcessorDeclarer<B extends
- EventProcessorBindingParams> extends EventProcessorDeclarer<B, KafkaStreamsDataProcessorRuntime> {
+ EventProcessorBindingParams> extends EventProcessorDeclarer<B, KafkaStreamsDataProcessorRuntime> {
}
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
index 79cbbdd29..02cd53799 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.java
@@ -17,10 +17,6 @@
*/
package org.apache.streampipes.wrapper.kafka;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -32,13 +28,18 @@ import org.apache.streampipes.wrapper.kafka.converter.MapToJsonFormat;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
import java.util.Map;
import java.util.regex.Pattern;
public abstract class KafkaStreamsDataProcessorRuntime<B extends
- EventProcessorBindingParams>
- extends KafkaStreamsRuntime<EventProcessorRuntimeParams<B>, B,
- DataProcessorInvocation, EventProcessorRuntimeContext> {
+ EventProcessorBindingParams>
+ extends KafkaStreamsRuntime<EventProcessorRuntimeParams<B>, B,
+ DataProcessorInvocation, EventProcessorRuntimeContext> {
public KafkaStreamsDataProcessorRuntime(EventProcessorRuntimeParams<B> runtimeParams) {
@@ -53,7 +54,7 @@ public abstract class KafkaStreamsDataProcessorRuntime<B extends
StreamsBuilder builder = new StreamsBuilder();
SpDataStream inputStream = runtimeParams.getBindingParams().getGraph().getInputStreams().get(0);
TransportProtocol protocol = protocol(inputStream);
- KStream<String, String> stream ;
+ KStream<String, String> stream;
if (protocol.getTopicDefinition() instanceof SimpleTopicDefinition) {
stream = builder.stream(getTopic(inputStream));
@@ -62,12 +63,12 @@ public abstract class KafkaStreamsDataProcessorRuntime<B extends
}
KStream<String, Map<String, Object>> mapFormat = stream.flatMapValues((ValueMapper<String, Iterable<Map<String,
- Object>>>) s -> new JsonToMapFormat(getGraph()).apply(s));
+ Object>>>) s -> new JsonToMapFormat(getGraph()).apply(s));
KStream<String, String> outStream = getApplicationLogic(mapFormat).flatMapValues(new
- MapToJsonFormat());
+ MapToJsonFormat());
outStream.to(getTopic(runtimeParams.getBindingParams().getGraph()
- .getOutputStream()));
+ .getOutputStream()));
streams = new KafkaStreams(builder.build(), config);
streams.start();
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
index 36dfa4e8d..349f6b556 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkDeclarer.java
@@ -20,6 +20,6 @@ package org.apache.streampipes.wrapper.kafka;
import org.apache.streampipes.wrapper.declarer.EventSinkDeclarer;
import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-public abstract class KafkaStreamsDataSinkDeclarer <B extends
- EventSinkBindingParams> extends EventSinkDeclarer<B, KafkaStreamsDataSinkRuntime> {
+public abstract class KafkaStreamsDataSinkDeclarer<B extends
+ EventSinkBindingParams> extends EventSinkDeclarer<B, KafkaStreamsDataSinkRuntime> {
}
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
index 9e9b8dfa2..56270602e 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsDataSinkRuntime.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
public abstract class KafkaStreamsDataSinkRuntime<B extends EventSinkBindingParams>
- extends KafkaStreamsRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
+ extends KafkaStreamsRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
public KafkaStreamsDataSinkRuntime(EventSinkRuntimeParams<B> runtimeParams) {
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
index cdb8d665a..cc4bd5c63 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java
@@ -17,10 +17,6 @@
*/
package org.apache.streampipes.wrapper.kafka;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.wrapper.context.RuntimeContext;
@@ -28,12 +24,17 @@ import org.apache.streampipes.wrapper.distributed.runtime.DistributedRuntime;
import org.apache.streampipes.wrapper.params.binding.BindingParams;
import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+
import java.util.Map;
import java.util.Properties;
public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B extends
- BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext> extends
- DistributedRuntime<RP, B, I, RC> {
+ BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext> extends
+ DistributedRuntime<RP, B, I, RC> {
Properties config;
KafkaStreams streams;
@@ -46,10 +47,10 @@ public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B
public void prepareRuntime() throws SpRuntimeException {
config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, gneerateApplicationId(runtimeParams.getBindingParams()
- .getGraph()
- .getElementId()));
+ .getGraph()
+ .getElementId()));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaUrl(runtimeParams.getBindingParams().getGraph()
- .getInputStreams().get(0)));
+ .getInputStreams().get(0)));
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
}
@@ -69,6 +70,6 @@ public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B
}
protected abstract KStream<String, Map<String, Object>> getApplicationLogic(KStream<String, Map<String, Object>>...
- inputStreams);
+ inputStreams);
}
diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
index 7173d7c40..9a077d313 100644
--- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
+++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/converter/JsonToMapFormat.java
@@ -17,18 +17,19 @@
*/
package org.apache.streampipes.wrapper.kafka.converter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.streampipes.logging.impl.EventStatisticLogger;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class JsonToMapFormat implements ValueMapper<String, Iterable<Map<String,
- Object>>> {
+ Object>>> {
private ObjectMapper mapper;
private InvocableStreamPipesEntity graph;