You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/22 20:09:17 UTC
[streampipes] 05/07: add checkstyle to streampipes-wrapper-flink
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 1e73b24fd1efd659822289bd7dfc0bb07d627c1c
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:06:56 2022 +0100
add checkstyle to streampipes-wrapper-flink
---
streampipes-wrapper-flink/pom.xml | 11 ++-
.../wrapper/flink/FlinkDataProcessorDeclarer.java | 4 +-
.../wrapper/flink/FlinkDataProcessorRuntime.java | 32 ++++-----
.../wrapper/flink/FlinkDataSinkDeclarer.java | 4 +-
.../wrapper/flink/FlinkDataSinkRuntime.java | 15 +++--
.../flink/FlinkMiniClusterDeploymentConfig.java | 6 +-
.../streampipes/wrapper/flink/FlinkRuntime.java | 78 ++++++++++++----------
.../wrapper/flink/FlinkSpMiniCluster.java | 8 +--
.../wrapper/flink/consumer/JmsFlinkConsumer.java | 7 +-
.../wrapper/flink/consumer/MqttFlinkConsumer.java | 5 +-
.../flink/converter/EventToMapConverter.java | 5 +-
.../flink/converter/MapToEventConverter.java | 13 ++--
.../wrapper/flink/logger/StatisticLogger.java | 23 ++++---
.../flink/serializer/ByteArrayDeserializer.java | 3 +-
.../flink/serializer/ByteArraySerializer.java | 3 +-
.../flink/serializer/SimpleJmsSerializer.java | 34 +++++-----
.../wrapper/flink/sink/JmsFlinkProducer.java | 7 +-
.../wrapper/flink/sink/MqttFlinkProducer.java | 7 +-
.../flink/status/PipelineElementStatusSender.java | 3 +-
.../status/PipelineElementStatusSenderFactory.java | 10 +--
20 files changed, 154 insertions(+), 124 deletions(-)
diff --git a/streampipes-wrapper-flink/pom.xml b/streampipes-wrapper-flink/pom.xml
index 6208b71b8..170f966cb 100644
--- a/streampipes-wrapper-flink/pom.xml
+++ b/streampipes-wrapper-flink/pom.xml
@@ -16,7 +16,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampipes</groupId>
@@ -111,4 +112,12 @@
<artifactId>snappy-java</artifactId>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
index 72405b983..d6bd199d2 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorDeclarer.java
@@ -21,8 +21,8 @@ package org.apache.streampipes.wrapper.flink;
import org.apache.streampipes.wrapper.declarer.EventProcessorDeclarer;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-public abstract class FlinkDataProcessorDeclarer<B extends EventProcessorBindingParams>
- extends EventProcessorDeclarer<B, FlinkDataProcessorRuntime<B>> {
+public abstract class FlinkDataProcessorDeclarer<T extends EventProcessorBindingParams>
+ extends EventProcessorDeclarer<T, FlinkDataProcessorRuntime<T>> {
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
index 23baa1cdf..f1269507f 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
@@ -18,8 +18,6 @@
package org.apache.streampipes.wrapper.flink;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.container.config.ConfigExtractor;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
@@ -35,6 +33,9 @@ import org.apache.streampipes.wrapper.flink.sink.JmsFlinkProducer;
import org.apache.streampipes.wrapper.flink.sink.MqttFlinkProducer;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,15 +43,15 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
-public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingParams> extends
- FlinkRuntime<EventProcessorRuntimeParams<B>, B,
+public abstract class FlinkDataProcessorRuntime<T extends EventProcessorBindingParams> extends
+ FlinkRuntime<EventProcessorRuntimeParams<T>, T,
DataProcessorInvocation, EventProcessorRuntimeContext> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(FlinkDataProcessorRuntime.class);
- public FlinkDataProcessorRuntime(B params,
+ public FlinkDataProcessorRuntime(T params,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
super(params, configExtractor, streamPipesClient);
@@ -59,25 +60,25 @@ public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingP
@SuppressWarnings("deprecation")
public void appendExecutionConfig(DataStream<Event>... convertedStream) {
DataStream<Map<String, Object>> applicationLogic = getApplicationLogic(convertedStream).flatMap
- (new EventToMapConverter());
+ (new EventToMapConverter());
EventGrounding outputGrounding = getOutputStream().getEventGrounding();
SpDataFormatDefinition outputDataFormatDefinition =
- getDataFormatDefinition(outputGrounding.getTransportFormats().get(0));
+ getDataFormatDefinition(outputGrounding.getTransportFormats().get(0));
ByteArraySerializer serializer =
- new ByteArraySerializer(outputDataFormatDefinition);
+ new ByteArraySerializer(outputDataFormatDefinition);
if (isKafkaProtocol(getOutputStream())) {
applicationLogic
- .addSink(new FlinkKafkaProducer<>(getTopic(getOutputStream()),
- serializer,
- getProducerProperties((KafkaTransportProtocol) outputGrounding.getTransportProtocol())));
+ .addSink(new FlinkKafkaProducer<>(getTopic(getOutputStream()),
+ serializer,
+ getProducerProperties((KafkaTransportProtocol) outputGrounding.getTransportProtocol())));
} else if (isJmsProtocol(getOutputStream())) {
applicationLogic
- .addSink(new JmsFlinkProducer(getJmsProtocol(getOutputStream()), serializer));
+ .addSink(new JmsFlinkProducer(getJmsProtocol(getOutputStream()), serializer));
} else if (isMqttProtocol(getOutputStream())) {
applicationLogic
- .addSink(new MqttFlinkProducer(getMqttProtocol(getOutputStream()), serializer));
+ .addSink(new MqttFlinkProducer(getMqttProtocol(getOutputStream()), serializer));
}
}
@@ -100,9 +101,10 @@ public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingP
}
@Override
- protected EventProcessorRuntimeParams<B> makeRuntimeParams(ConfigExtractor configExtractor,
+ protected EventProcessorRuntimeParams<T> makeRuntimeParams(ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
- LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by a deployed Flink program due to non-serializable classes.");
+ LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by"
+ + " a deployed Flink program due to non-serializable classes.");
return new EventProcessorRuntimeParams<>(bindingParams, false, null, null);
}
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
index 4af5d9190..a9001b6a4 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkDeclarer.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.wrapper.flink;
import org.apache.streampipes.wrapper.declarer.EventSinkDeclarer;
import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-public abstract class FlinkDataSinkDeclarer<B extends EventSinkBindingParams>
- extends EventSinkDeclarer<B, FlinkDataSinkRuntime<B>> {
+public abstract class FlinkDataSinkDeclarer<T extends EventSinkBindingParams>
+ extends EventSinkDeclarer<T, FlinkDataSinkRuntime<T>> {
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
index d633eec58..e9f062f11 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.wrapper.flink;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.container.config.ConfigExtractor;
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -26,16 +25,18 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class FlinkDataSinkRuntime<B extends EventSinkBindingParams> extends
- FlinkRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
+public abstract class FlinkDataSinkRuntime<T extends EventSinkBindingParams> extends
+ FlinkRuntime<EventSinkRuntimeParams<T>, T, DataSinkInvocation, EventSinkRuntimeContext> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(FlinkDataSinkRuntime.class);
- public FlinkDataSinkRuntime(B params,
+ public FlinkDataSinkRuntime(T params,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
super(params, configExtractor, streamPipesClient);
@@ -49,9 +50,11 @@ public abstract class FlinkDataSinkRuntime<B extends EventSinkBindingParams> ext
public abstract void getSink(DataStream<Event>... convertedStream1);
- protected EventSinkRuntimeParams<B> makeRuntimeParams(ConfigExtractor configExtractor,
+ protected EventSinkRuntimeParams<T> makeRuntimeParams(ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
- LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by a deployed Flink program due to non-serializable classes.");
+ LOG.warn(
+ "The config extractor and StreamPipes Client can currently not be accessed by"
+ + " a deployed Flink program due to non-serializable classes.");
return new EventSinkRuntimeParams<>(bindingParams, false, null, null);
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
index 3501c78cf..9ede5d55d 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.wrapper.flink;
public class FlinkMiniClusterDeploymentConfig extends FlinkDeploymentConfig {
- public FlinkMiniClusterDeploymentConfig() {
- super("", "localhost", 6123, true);
- }
+ public FlinkMiniClusterDeploymentConfig() {
+ super("", "localhost", 6123, true);
+ }
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
index b1f688385..e25cac0eb 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
@@ -18,25 +18,18 @@
package org.apache.streampipes.wrapper.flink;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.container.config.ConfigExtractor;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.grounding.*;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.RuntimeContext;
import org.apache.streampipes.wrapper.distributed.runtime.DistributedRuntime;
@@ -48,6 +41,19 @@ import org.apache.streampipes.wrapper.flink.serializer.ByteArrayDeserializer;
import org.apache.streampipes.wrapper.params.binding.BindingParams;
import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
@@ -55,10 +61,10 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
-public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends BindingParams<I>, I
- extends
- InvocableStreamPipesEntity, RC extends RuntimeContext> extends
- DistributedRuntime<RP, B, I, RC> implements Runnable, Serializable {
+public abstract class FlinkRuntime<T extends RuntimeParams<V, W, X>, V extends BindingParams<W>, W
+ extends
+ InvocableStreamPipesEntity, X extends RuntimeContext> extends
+ DistributedRuntime<T, V, W, X> implements Runnable, Serializable {
private static final long serialVersionUID = 1L;
@@ -66,7 +72,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
protected FlinkDeploymentConfig config;
private StreamExecutionEnvironment env;
- public FlinkRuntime(B bindingParams,
+ public FlinkRuntime(V bindingParams,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
super(bindingParams, configExtractor, streamPipesClient);
@@ -80,9 +86,9 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
} else {
FlinkSpMiniCluster.INSTANCE.start();
FlinkSpMiniCluster
- .INSTANCE
- .getClusterClient()
- .submitJob(env.getStreamGraph(bindingParams.getGraph().getElementId()).getJobGraph());
+ .INSTANCE
+ .getClusterClient()
+ .submitJob(env.getStreamGraph(bindingParams.getGraph().getElementId()).getJobGraph());
}
} catch (Exception e) {
e.printStackTrace();
@@ -135,7 +141,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
}
private SourceFunction<Map<String, Object>> getJmsConsumer(JmsTransportProtocol protocol,
- SpDataFormatDefinition spDataFormatDefinition) {
+ SpDataFormatDefinition spDataFormatDefinition) {
return new JmsFlinkConsumer(protocol, spDataFormatDefinition);
}
@@ -148,12 +154,12 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
SpDataFormatDefinition spDataFormatDefinition) {
if (protocol.getTopicDefinition() instanceof SimpleTopicDefinition) {
return new FlinkKafkaConsumer<>(protocol
- .getTopicDefinition()
- .getActualTopicName(), new ByteArrayDeserializer(spDataFormatDefinition), getProperties(protocol));
+ .getTopicDefinition()
+ .getActualTopicName(), new ByteArrayDeserializer(spDataFormatDefinition), getProperties(protocol));
} else {
String patternTopic = replaceWildcardWithPatternFormat(protocol.getTopicDefinition().getActualTopicName());
return new FlinkKafkaConsumer<>(Pattern.compile(patternTopic), new ByteArrayDeserializer(spDataFormatDefinition),
- getProperties(protocol));
+ getProperties(protocol));
}
}
@@ -163,7 +169,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
this.env = StreamExecutionEnvironment.createLocalEnvironment();
} else {
this.env = StreamExecutionEnvironment
- .createRemoteEnvironment(config.getHost(), config.getPort(), config.getJarFile());
+ .createRemoteEnvironment(config.getHost(), config.getPort(), config.getJarFile());
}
appendEnvironmentConfig(this.env);
@@ -188,10 +194,10 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
private DataStream<Event> addSource(SourceFunction<Map<String, Object>> sourceFunction,
Integer sourceIndex) {
return env
- .addSource(sourceFunction)
- .flatMap(new MapToEventConverter<>(runtimeParams.getSourceInfo(sourceIndex).getSourceId(),
- runtimeParams))
- .flatMap(new StatisticLogger(getGraph()));
+ .addSource(sourceFunction)
+ .flatMap(new MapToEventConverter<>(runtimeParams.getSourceInfo(sourceIndex).getSourceId(),
+ runtimeParams))
+ .flatMap(new StatisticLogger(getGraph()));
}
@Override
@@ -219,9 +225,9 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
count++;
Thread.sleep(1000);
Optional<JobStatusMessage> statusMessageOpt =
- getJobStatus(bindingParams.getGraph().getElementId());
+ getJobStatus(bindingParams.getGraph().getElementId());
if (statusMessageOpt.isPresent()
- && statusMessageOpt.get().getJobState().name().equals("RUNNING")) {
+ && statusMessageOpt.get().getJobState().name().equals("RUNNING")) {
isDeployed = true;
}
@@ -246,7 +252,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
try {
ClusterClient<? extends Comparable<? extends Comparable<?>>> clusterClient = getClusterClient();
Optional<JobStatusMessage> jobStatusMessage =
- getJobStatus(bindingParams.getGraph().getElementId());
+ getJobStatus(bindingParams.getGraph().getElementId());
if (jobStatusMessage.isPresent()) {
String jobStatusStr = jobStatusMessage.get().getJobState().name();
// Cancel the job if running
@@ -304,15 +310,15 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
// First, find a job with Running status
Optional<JobStatusMessage> job = jobsFound.stream()
- .filter(j -> j.getJobName().equals(jobName) && j.getJobState().name().equals("RUNNING")).findFirst();
+ .filter(j -> j.getJobName().equals(jobName) && j.getJobState().name().equals("RUNNING")).findFirst();
if (job.isPresent()) {
return job;
}
// Otherwise return job with any other status
return jobs.get()
- .stream()
- .filter(j -> j.getJobName().equals(jobName)).findFirst();
+ .stream()
+ .filter(j -> j.getJobName().equals(jobName)).findFirst();
} catch (Exception e) {
e.printStackTrace();
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
index 0d1260dfa..1fccbe4a7 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkSpMiniCluster.java
@@ -36,10 +36,10 @@ public enum FlinkSpMiniCluster {
Configuration configuration = new Configuration();
configuration.setString(RestOptions.BIND_PORT, "0");
MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration
- .Builder()
- .setConfiguration(configuration)
- .setNumTaskManagers(2)
- .build();
+ .Builder()
+ .setConfiguration(configuration)
+ .setNumTaskManagers(2)
+ .build();
this.miniCluster = new MiniCluster(miniClusterConfiguration);
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
index 8315e6c95..3706c818a 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/JmsFlinkConsumer.java
@@ -17,15 +17,16 @@
*/
package org.apache.streampipes.wrapper.flink.consumer;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.Serializable;
import java.util.Map;
import java.util.Queue;
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
index 81eb2551c..f4a0f77f8 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/consumer/MqttFlinkConsumer.java
@@ -17,12 +17,13 @@
*/
package org.apache.streampipes.wrapper.flink.consumer;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.mqtt.MqttConsumer;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +39,8 @@ public class MqttFlinkConsumer implements SourceFunction<Map<String, Object>>, S
private final MqttTransportProtocol protocol;
private final MqttConsumer mqttConsumer;
private final SpDataFormatDefinition spDataFormatDefinition;
- private Boolean isRunning;
private final Queue<byte[]> queue;
+ private Boolean isRunning;
public MqttFlinkConsumer(MqttTransportProtocol protocol, SpDataFormatDefinition spDataFormatDefinition) {
this.protocol = protocol;
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
index 0e91c76d0..8d5d4dbb0 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/EventToMapConverter.java
@@ -17,11 +17,12 @@
*/
package org.apache.streampipes.wrapper.flink.converter;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventConverter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
import java.util.Map;
public class EventToMapConverter implements FlatMapFunction<Event, Map<String, Object>> {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
index 5c5b11ab5..2e1e01983 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/converter/MapToEventConverter.java
@@ -17,23 +17,24 @@
*/
package org.apache.streampipes.wrapper.flink.converter;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.params.runtime.RuntimeParams;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
import java.util.Map;
-public class MapToEventConverter<RP extends RuntimeParams<?, ?, ?>> implements
- FlatMapFunction<Map<String,
+public class MapToEventConverter<T extends RuntimeParams<?, ?, ?>> implements
+ FlatMapFunction<Map<String,
Object>, Event> {
private static final long serialVersionUID = 1L;
- private RP runtimeParams;
+ private T runtimeParams;
private String sourceId;
- public MapToEventConverter(String sourceId, RP runtimeParams) {
+ public MapToEventConverter(String sourceId, T runtimeParams) {
this.sourceId = sourceId;
this.runtimeParams = runtimeParams;
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
index 79d847fc9..1fc4f1707 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/logger/StatisticLogger.java
@@ -18,25 +18,26 @@
package org.apache.streampipes.wrapper.flink.logger;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
import org.apache.streampipes.logging.impl.EventStatisticLogger;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.runtime.Event;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
public class StatisticLogger implements FlatMapFunction<Event, Event> {
- private InvocableStreamPipesEntity graph;
+ private InvocableStreamPipesEntity graph;
- public StatisticLogger(InvocableStreamPipesEntity graph) {
- this.graph = graph;
- }
+ public StatisticLogger(InvocableStreamPipesEntity graph) {
+ this.graph = graph;
+ }
- @Override
- public void flatMap(Event in, Collector<Event> out) throws Exception {
- EventStatisticLogger.log(graph.getName(), graph.getCorrespondingPipeline(), graph.getUri());
- out.collect(in);
- }
+ @Override
+ public void flatMap(Event in, Collector<Event> out) throws Exception {
+ EventStatisticLogger.log(graph.getName(), graph.getCorrespondingPipeline(), graph.getUri());
+ out.collect(in);
+ }
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
index 82020e0d6..81b195fd4 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArrayDeserializer.java
@@ -17,10 +17,11 @@
*/
package org.apache.streampipes.wrapper.flink.serializer;
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
import java.io.IOException;
import java.util.Map;
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
index a36373b39..bc5354b11 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/ByteArraySerializer.java
@@ -17,10 +17,11 @@
*/
package org.apache.streampipes.wrapper.flink.serializer;
-import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
import java.util.Map;
public class ByteArraySerializer implements SerializationSchema<Map<String, Object>> {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
index df10f121d..e6834e7de 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/serializer/SimpleJmsSerializer.java
@@ -24,22 +24,22 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
import java.util.Map;
-public class SimpleJmsSerializer implements SerializationSchema<Map<String, Object>>{
-
- private ObjectMapper objectMapper;
-
- public SimpleJmsSerializer() {
- this.objectMapper = new ObjectMapper();
- }
-
- @Override
- public byte[] serialize(Map<String, Object> payload) {
- try {
- return objectMapper.writeValueAsBytes(payload);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- return null;
- }
- }
+public class SimpleJmsSerializer implements SerializationSchema<Map<String, Object>> {
+
+ private ObjectMapper objectMapper;
+
+ public SimpleJmsSerializer() {
+ this.objectMapper = new ObjectMapper();
+ }
+
+ @Override
+ public byte[] serialize(Map<String, Object> payload) {
+ try {
+ return objectMapper.writeValueAsBytes(payload);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
index 35270bec4..549d46c04 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/JmsFlinkProducer.java
@@ -18,12 +18,13 @@
package org.apache.streampipes.wrapper.flink.sink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.wrapper.flink.serializer.ByteArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
import java.util.Map;
@@ -40,7 +41,7 @@ public class JmsFlinkProducer extends RichSinkFunction<Map<String, Object>> {
private ActiveMQPublisher publisher;
public JmsFlinkProducer(JmsTransportProtocol protocol, ByteArraySerializer
- serializationSchema) {
+ serializationSchema) {
this.protocol = protocol;
this.serializationSchema = serializationSchema;
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
index bda60da93..e2b6e4f82 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/sink/MqttFlinkProducer.java
@@ -18,12 +18,13 @@
package org.apache.streampipes.wrapper.flink.sink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.streampipes.messaging.mqtt.MqttPublisher;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.wrapper.flink.serializer.ByteArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
import java.util.Map;
@@ -40,7 +41,7 @@ public class MqttFlinkProducer extends RichSinkFunction<Map<String, Object>> {
private MqttPublisher publisher;
public MqttFlinkProducer(MqttTransportProtocol protocol, ByteArraySerializer
- serializationSchema) {
+ serializationSchema) {
this.protocol = protocol;
this.serializationSchema = serializationSchema;
}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
index 8eb3c19dd..123dbf1e8 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSender.java
@@ -18,9 +18,10 @@
package org.apache.streampipes.wrapper.flink.status;
-import com.google.gson.Gson;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import com.google.gson.Gson;
+
import java.io.Serializable;
public class PipelineElementStatusSender implements Serializable {
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
index 76155bed8..df5fd0890 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/status/PipelineElementStatusSenderFactory.java
@@ -24,19 +24,19 @@ import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
public class PipelineElementStatusSenderFactory {
- public static <I extends InvocableStreamPipesEntity> PipelineElementStatusSender getStatusSender(I graph) {
+ public static <T extends InvocableStreamPipesEntity> PipelineElementStatusSender getStatusSender(T graph) {
SpKafkaProducer kafkaProducer = new SpKafkaProducer();
// TODO refactor
return new PipelineElementStatusSender(kafkaProducer,
- graph.getStatusInfoSettings().getErrorTopic(),
- graph.getStatusInfoSettings().getStatsTopic());
+ graph.getStatusInfoSettings().getErrorTopic(),
+ graph.getStatusInfoSettings().getStatsTopic());
}
- private static <I extends InvocableStreamPipesEntity> String buildKafkaUrl(I graph) {
+ private static <T extends InvocableStreamPipesEntity> String buildKafkaUrl(T graph) {
ElementStatusInfoSettings settings = graph.getStatusInfoSettings();
- return settings.getKafkaHost() +":" +settings.getKafkaPort();
+ return settings.getKafkaHost() + ":" + settings.getKafkaPort();
}
}