You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/22 20:09:15 UTC
[streampipes] 03/07: add checkstyle to streampipes-wrapper-standalone
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch chore/add-checkstyle
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 3e9ffeb974b2afaa4493ac7490e0fcc4d12945de
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Dec 22 21:00:58 2022 +0100
add checkstyle to streampipes-wrapper-standalone
---
streampipes-wrapper-standalone/pom.xml | 43 ++--
.../AbstractConfiguredPipelineElement.java | 14 +-
.../standalone/ConfiguredEventProcessor.java | 6 +-
.../wrapper/standalone/ConfiguredEventSink.java | 5 +-
.../ConfiguredExternalEventProcessor.java | 8 +-
.../standalone/ConfiguredExternalEventSink.java | 6 +-
.../standalone/StreamPipesDataProcessor.java | 6 +-
.../wrapper/standalone/StreamPipesDataSink.java | 3 +-
.../StreamPipesExternalDataProcessor.java | 222 +++++++++++----------
.../StandaloneEventProcessingDeclarer.java | 21 +-
.../StandaloneEventProcessorDeclarerSingleton.java | 27 ++-
.../declarer/StandaloneEventSinkDeclarer.java | 20 +-
.../StandaloneEventSinkDeclarerSingleton.java | 20 +-
.../StandaloneExternalEventProcessingDeclarer.java | 26 ++-
.../StandaloneExternalEventSinkDeclarer.java | 29 ++-
.../standalone/function/StreamPipesFunction.java | 3 +-
.../standalone/manager/ProtocolManager.java | 30 +--
.../standalone/routing/StandaloneSpCollector.java | 12 +-
.../routing/StandaloneSpInputCollector.java | 12 +-
.../routing/StandaloneSpOutputCollector.java | 11 +-
.../runtime/StandaloneEventProcessorRuntime.java | 11 +-
.../runtime/StandaloneEventSinkRuntime.java | 11 +-
.../StandaloneExternalEventProcessorRuntime.java | 10 +-
.../StandaloneExternalEventSinkRuntime.java | 9 +-
.../runtime/StandalonePipelineElementRuntime.java | 24 +--
25 files changed, 329 insertions(+), 260 deletions(-)
diff --git a/streampipes-wrapper-standalone/pom.xml b/streampipes-wrapper-standalone/pom.xml
index df12f29d9..a873cb7eb 100644
--- a/streampipes-wrapper-standalone/pom.xml
+++ b/streampipes-wrapper-standalone/pom.xml
@@ -16,23 +16,32 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-parent</artifactId>
- <version>0.91.0-SNAPSHOT</version>
- </parent>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-parent</artifactId>
+ <version>0.91.0-SNAPSHOT</version>
+ </parent>
- <artifactId>streampipes-wrapper-standalone</artifactId>
- <name>StreamPipes Wrapper for Standalone Pipeline Element Implementations</name>
+ <artifactId>streampipes-wrapper-standalone</artifactId>
+ <name>StreamPipes Wrapper for Standalone Pipeline Element Implementations</name>
- <dependencies>
- <!-- StreamPipes dependencies -->
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-wrapper</artifactId>
- <version>0.91.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
+ <dependencies>
+ <!-- StreamPipes dependencies -->
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-wrapper</artifactId>
+ <version>0.91.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
index 0d1a189b8..01c42ba73 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/AbstractConfiguredPipelineElement.java
@@ -23,20 +23,20 @@ import org.apache.streampipes.wrapper.runtime.PipelineElement;
import java.util.function.Supplier;
-public class AbstractConfiguredPipelineElement<I extends InvocableStreamPipesEntity,
- B extends BindingParams<I>,
- T extends PipelineElement<B, I>> {
+public class AbstractConfiguredPipelineElement<K extends InvocableStreamPipesEntity,
+ V extends BindingParams<K>,
+ T extends PipelineElement<V, K>> {
- private B bindingParams;
+ private V bindingParams;
private Supplier<T> engineSupplier;
- public AbstractConfiguredPipelineElement(B bindingParams,
- Supplier<T> engineSupplier) {
+ public AbstractConfiguredPipelineElement(V bindingParams,
+ Supplier<T> engineSupplier) {
this.bindingParams = bindingParams;
this.engineSupplier = engineSupplier;
}
- public B getBindingParams() {
+ public V getBindingParams() {
return bindingParams;
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
index 551956e76..d4e77a9b2 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventProcessor.java
@@ -24,10 +24,10 @@ import org.apache.streampipes.wrapper.runtime.EventProcessor;
import java.util.function.Supplier;
-public class ConfiguredEventProcessor<B extends EventProcessorBindingParams>
- extends AbstractConfiguredPipelineElement<DataProcessorInvocation, B, EventProcessor<B>> {
+public class ConfiguredEventProcessor<T extends EventProcessorBindingParams>
+ extends AbstractConfiguredPipelineElement<DataProcessorInvocation, T, EventProcessor<T>> {
- public ConfiguredEventProcessor(B bindingParams, Supplier<EventProcessor<B>> engineSupplier) {
+ public ConfiguredEventProcessor(T bindingParams, Supplier<EventProcessor<T>> engineSupplier) {
super(bindingParams, engineSupplier);
}
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
index fbc8b05d9..f6a8e2b49 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredEventSink.java
@@ -24,9 +24,10 @@ import org.apache.streampipes.wrapper.runtime.EventSink;
import java.util.function.Supplier;
-public class ConfiguredEventSink<B extends EventSinkBindingParams> extends AbstractConfiguredPipelineElement<DataSinkInvocation, B, EventSink<B>> {
+public class ConfiguredEventSink<T extends EventSinkBindingParams>
+ extends AbstractConfiguredPipelineElement<DataSinkInvocation, T, EventSink<T>> {
- public ConfiguredEventSink(B bindingParams, Supplier<EventSink<B>> engineSupplier) {
+ public ConfiguredEventSink(T bindingParams, Supplier<EventSink<T>> engineSupplier) {
super(bindingParams, engineSupplier);
}
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
index 147b11fcb..67fe96048 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventProcessor.java
@@ -23,11 +23,11 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
import java.util.function.Supplier;
-public class ConfiguredExternalEventProcessor<B extends EventProcessorBindingParams>
- extends AbstractConfiguredPipelineElement<DataProcessorInvocation, B,
- ExternalEventProcessor<B>> {
+public class ConfiguredExternalEventProcessor<T extends EventProcessorBindingParams>
+ extends AbstractConfiguredPipelineElement<DataProcessorInvocation, T,
+ ExternalEventProcessor<T>> {
- public ConfiguredExternalEventProcessor(B bindingParams, Supplier<ExternalEventProcessor<B>> engineSupplier) {
+ public ConfiguredExternalEventProcessor(T bindingParams, Supplier<ExternalEventProcessor<T>> engineSupplier) {
super(bindingParams, engineSupplier);
}
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
index 38418fba3..9abf7b413 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/ConfiguredExternalEventSink.java
@@ -23,10 +23,10 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventSink;
import java.util.function.Supplier;
-public class ConfiguredExternalEventSink<B extends EventSinkBindingParams>
- extends AbstractConfiguredPipelineElement<DataSinkInvocation, B, ExternalEventSink<B>> {
+public class ConfiguredExternalEventSink<T extends EventSinkBindingParams>
+ extends AbstractConfiguredPipelineElement<DataSinkInvocation, T, ExternalEventSink<T>> {
- public ConfiguredExternalEventSink(B bindingParams, Supplier<ExternalEventSink<B>> engineSupplier) {
+ public ConfiguredExternalEventSink(T bindingParams, Supplier<ExternalEventSink<T>> engineSupplier) {
super(bindingParams, engineSupplier);
}
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
index f5dde0257..f938e413d 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataProcessor.java
@@ -24,10 +24,12 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
import java.util.function.Supplier;
-public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams> implements EventProcessor<ProcessorParams> {
+public abstract class StreamPipesDataProcessor extends StandaloneEventProcessingDeclarer<ProcessorParams>
+ implements EventProcessor<ProcessorParams> {
@Override
- public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public ConfiguredEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
+ ProcessingElementParameterExtractor extractor) {
Supplier<EventProcessor<ProcessorParams>> supplier = () -> this;
return new ConfiguredEventProcessor<>(new ProcessorParams(graph), supplier);
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
index b087b0b2b..5fa3dc4af 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesDataSink.java
@@ -24,7 +24,8 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDec
import java.util.function.Supplier;
-public abstract class StreamPipesDataSink extends StandaloneEventSinkDeclarer<SinkParams> implements EventSink<SinkParams> {
+public abstract class StreamPipesDataSink extends StandaloneEventSinkDeclarer<SinkParams>
+ implements EventSink<SinkParams> {
@Override
public ConfiguredEventSink<SinkParams> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
index 1992905c3..3b29d8a62 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
@@ -17,9 +17,6 @@
*/
package org.apache.streampipes.wrapper.standalone;
-import com.google.gson.JsonObject;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -27,119 +24,124 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
+import com.google.gson.JsonObject;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
-public abstract class StreamPipesExternalDataProcessor extends StandaloneExternalEventProcessingDeclarer<ProcessorParams>
- implements ExternalEventProcessor<ProcessorParams> {
-
- // endpoint of Python processor runs here
- private static final String HTTP_PROTOCOL = "http://";
- private static final String PYTHON_ENDPOINT = "localhost:5000";
-
- private String invocationId;
- private String appId;
- private String inputTopic;
- private String outputTopic;
- private String kafkaUrl;
-
- @Override
- public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
- ProcessingElementParameterExtractor extractor) {
- EventProcessorBindingParams params = new ProcessorParams(graph);
- invocationId = UUID.randomUUID().toString();
- appId = graph.getAppId();
- inputTopic = getInputTopic(params);
- outputTopic = getOutputTopic(params);
- kafkaUrl = getKafkaUrl(params);
-
- Supplier<ExternalEventProcessor<ProcessorParams>> supplier = () -> this;
- return new ConfiguredExternalEventProcessor<>(new ProcessorParams(graph), supplier);
- }
-
- protected JsonObject createMinimalInvocationGraph(Map<String, String> staticPropertyMap) {
- JsonObject json = new JsonObject();
-
- json.addProperty("invocation_id", invocationId);
- json.addProperty("processor_id", appId);
- json.addProperty("input_topics", inputTopic);
- json.addProperty("output_topics", outputTopic);
- json.addProperty("bootstrap_servers", kafkaUrl);
-
- JsonObject staticProperties = new JsonObject();
- staticPropertyMap.forEach(staticProperties::addProperty);
- json.add("static_properties", staticProperties);
-
- return json;
- }
-
- protected void invoke(JsonObject json) {
- post("invoke", json.toString());
- }
-
- protected void detach () {
- JsonObject json = new JsonObject();
- json.addProperty("invocation_id", invocationId);
- post("detach", json.toString());
- }
-
- private static String post(String endpoint, String payload) {
- String responseString = null;
-
- try {
- responseString = Request.Post(HTTP_PROTOCOL + PYTHON_ENDPOINT + "/" + endpoint)
- .bodyString(payload, ContentType.APPLICATION_JSON)
- .connectTimeout(1000)
- .socketTimeout(100000)
- .execute().returnContent().asString();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return responseString;
- }
-
- private String getInputTopic(EventProcessorBindingParams parameters) {
- return parameters
- .getGraph()
- .getInputStreams()
- .get(0)
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
-
- private String getOutputTopic(EventProcessorBindingParams parameters) {
- return parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
-
- private String getKafkaUrl(EventProcessorBindingParams parameters) {
- String brokerHostname = parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocols()
- .get(0)
- .getBrokerHostname();
-
- Integer kafkaPort = ((KafkaTransportProtocol) parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocols()
- .get(0))
- .getKafkaPort();
-
- return brokerHostname + ":" + kafkaPort.toString();
+public abstract class StreamPipesExternalDataProcessor
+ extends StandaloneExternalEventProcessingDeclarer<ProcessorParams>
+ implements ExternalEventProcessor<ProcessorParams> {
+
+ // endpoint of Python processor runs here
+ private static final String HTTP_PROTOCOL = "http://";
+ private static final String PYTHON_ENDPOINT = "localhost:5000";
+
+ private String invocationId;
+ private String appId;
+ private String inputTopic;
+ private String outputTopic;
+ private String kafkaUrl;
+
+ private static String post(String endpoint, String payload) {
+ String responseString = null;
+
+ try {
+ responseString = Request.Post(HTTP_PROTOCOL + PYTHON_ENDPOINT + "/" + endpoint)
+ .bodyString(payload, ContentType.APPLICATION_JSON)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute().returnContent().asString();
+ } catch (IOException e) {
+ e.printStackTrace();
}
+ return responseString;
+ }
+
+ @Override
+ public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
+ ProcessingElementParameterExtractor extractor) {
+ EventProcessorBindingParams params = new ProcessorParams(graph);
+ invocationId = UUID.randomUUID().toString();
+ appId = graph.getAppId();
+ inputTopic = getInputTopic(params);
+ outputTopic = getOutputTopic(params);
+ kafkaUrl = getKafkaUrl(params);
+
+ Supplier<ExternalEventProcessor<ProcessorParams>> supplier = () -> this;
+ return new ConfiguredExternalEventProcessor<>(new ProcessorParams(graph), supplier);
+ }
+
+ protected JsonObject createMinimalInvocationGraph(Map<String, String> staticPropertyMap) {
+ JsonObject json = new JsonObject();
+
+ json.addProperty("invocation_id", invocationId);
+ json.addProperty("processor_id", appId);
+ json.addProperty("input_topics", inputTopic);
+ json.addProperty("output_topics", outputTopic);
+ json.addProperty("bootstrap_servers", kafkaUrl);
+
+ JsonObject staticProperties = new JsonObject();
+ staticPropertyMap.forEach(staticProperties::addProperty);
+ json.add("static_properties", staticProperties);
+
+ return json;
+ }
+
+ protected void invoke(JsonObject json) {
+ post("invoke", json.toString());
+ }
+
+ protected void detach() {
+ JsonObject json = new JsonObject();
+ json.addProperty("invocation_id", invocationId);
+ post("detach", json.toString());
+ }
+
+ private String getInputTopic(EventProcessorBindingParams parameters) {
+ return parameters
+ .getGraph()
+ .getInputStreams()
+ .get(0)
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
+ }
+
+ private String getOutputTopic(EventProcessorBindingParams parameters) {
+ return parameters
+ .getGraph()
+ .getOutputStream()
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
+ }
+
+ private String getKafkaUrl(EventProcessorBindingParams parameters) {
+ String brokerHostname = parameters
+ .getGraph()
+ .getOutputStream()
+ .getEventGrounding()
+ .getTransportProtocols()
+ .get(0)
+ .getBrokerHostname();
+
+ Integer kafkaPort = ((KafkaTransportProtocol) parameters
+ .getGraph()
+ .getOutputStream()
+ .getEventGrounding()
+ .getTransportProtocols()
+ .get(0))
+ .getKafkaPort();
+
+ return brokerHostname + ":" + kafkaPort.toString();
+ }
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
index 7bbc6d84c..d52cde04c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
@@ -29,22 +29,27 @@ import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
@Deprecated(since = "0.70.0", forRemoval = true)
-public abstract class StandaloneEventProcessingDeclarer<B extends
- EventProcessorBindingParams> extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
+public abstract class StandaloneEventProcessingDeclarer<T extends
+ EventProcessorBindingParams> extends EventProcessorDeclarer<T, StandaloneEventProcessorRuntime<T>> {
- public abstract ConfiguredEventProcessor<B> onInvocation(DataProcessorInvocation graph,
+ public abstract ConfiguredEventProcessor<T> onInvocation(DataProcessorInvocation graph,
ProcessingElementParameterExtractor extractor);
@Override
- public StandaloneEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+ public StandaloneEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
ProcessingElementParameterExtractor extractor,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
- ConfiguredEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
- EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
- (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+ ConfiguredEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+ EventProcessorRuntimeParams<T> runtimeParams =
+ new EventProcessorRuntimeParams<> (
+ configuredEngine.getBindingParams(),
+ false,
+ configExtractor,
+ streamPipesClient
+ );
return new StandaloneEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
- runtimeParams);
+ runtimeParams);
}
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
index 2fcf23f40..74010a8a9 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessorDeclarerSingleton.java
@@ -28,22 +28,33 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
-public abstract class StandaloneEventProcessorDeclarerSingleton<B extends EventProcessorBindingParams>
- extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
+@Deprecated(since = "0.90.0", forRemoval = true)
+/**
+ * @deprecated: since there is no usage
+ */
+public abstract class StandaloneEventProcessorDeclarerSingleton<T extends EventProcessorBindingParams>
+ extends EventProcessorDeclarer<T, StandaloneEventProcessorRuntime<T>> {
@Override
- public StandaloneEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+ public StandaloneEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
ProcessingElementParameterExtractor extractor,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
- ConfiguredEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
- EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
- (configuredEngine.getBindingParams(), true, configExtractor, streamPipesClient);
+ ConfiguredEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+ EventProcessorRuntimeParams<T> runtimeParams =
+ new EventProcessorRuntimeParams<>
+ (
+ configuredEngine.getBindingParams(),
+ true,
+ configExtractor,
+ streamPipesClient
+ );
return new StandaloneEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
- runtimeParams);
+ runtimeParams);
}
- public abstract ConfiguredEventProcessor<B> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor);
+ public abstract ConfiguredEventProcessor<T> onInvocation(DataProcessorInvocation graph,
+ ProcessingElementParameterExtractor extractor);
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
index 048b857cc..115b21199 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarer.java
@@ -28,22 +28,28 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventSinkRuntime;
-public abstract class StandaloneEventSinkDeclarer<B extends
- EventSinkBindingParams> extends EventSinkDeclarer<B, StandaloneEventSinkRuntime<B>> {
+public abstract class StandaloneEventSinkDeclarer<T extends
+ EventSinkBindingParams> extends EventSinkDeclarer<T, StandaloneEventSinkRuntime<T>> {
@Override
- public StandaloneEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+ public StandaloneEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
DataSinkParameterExtractor extractor,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
- ConfiguredEventSink<B> configuredEngine = onInvocation(graph, extractor);
- EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
- (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+ ConfiguredEventSink<T> configuredEngine = onInvocation(graph, extractor);
+ EventSinkRuntimeParams<T> runtimeParams =
+ new EventSinkRuntimeParams<>
+ (
+ configuredEngine.getBindingParams(),
+ false,
+ configExtractor,
+ streamPipesClient
+ );
return new StandaloneEventSinkRuntime<>(configuredEngine.getEngineSupplier(), runtimeParams);
}
- public abstract ConfiguredEventSink<B> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
+ public abstract ConfiguredEventSink<T> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
index aa2a27b02..88d948b0f 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventSinkDeclarerSingleton.java
@@ -28,21 +28,27 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventSinkRuntime;
-public abstract class StandaloneEventSinkDeclarerSingleton<B extends
- EventSinkBindingParams> extends EventSinkDeclarer<B, StandaloneEventSinkRuntime<B>> {
+public abstract class StandaloneEventSinkDeclarerSingleton<T extends
+ EventSinkBindingParams> extends EventSinkDeclarer<T, StandaloneEventSinkRuntime<T>> {
@Override
- public StandaloneEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+ public StandaloneEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
DataSinkParameterExtractor extractor,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
- ConfiguredEventSink<B> configuredEngine = onInvocation(graph, extractor);
- EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
- (configuredEngine.getBindingParams(), true, configExtractor, streamPipesClient);
+ ConfiguredEventSink<T> configuredEngine = onInvocation(graph, extractor);
+ EventSinkRuntimeParams<T> runtimeParams =
+ new EventSinkRuntimeParams<>
+ (
+ configuredEngine.getBindingParams(),
+ true,
+ configExtractor,
+ streamPipesClient
+ );
return new StandaloneEventSinkRuntime<>(configuredEngine.getEngineSupplier(), runtimeParams);
}
- public abstract ConfiguredEventSink<B> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
+ public abstract ConfiguredEventSink<T> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor);
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
index 8e020cf4e..efa85bf6e 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventProcessingDeclarer.java
@@ -27,23 +27,29 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneExternalEventProcessorRuntime;
-public abstract class StandaloneExternalEventProcessingDeclarer<B extends
- EventProcessorBindingParams> extends EventProcessorDeclarer<B,
- StandaloneExternalEventProcessorRuntime<B>> {
+public abstract class StandaloneExternalEventProcessingDeclarer<T extends
+ EventProcessorBindingParams> extends EventProcessorDeclarer<T,
+ StandaloneExternalEventProcessorRuntime<T>> {
- public abstract ConfiguredExternalEventProcessor<B> onInvocation(DataProcessorInvocation graph,
- ProcessingElementParameterExtractor extractor);
+ public abstract ConfiguredExternalEventProcessor<T> onInvocation(DataProcessorInvocation graph,
+ ProcessingElementParameterExtractor extractor);
@Override
- public StandaloneExternalEventProcessorRuntime<B> getRuntime(DataProcessorInvocation graph,
+ public StandaloneExternalEventProcessorRuntime<T> getRuntime(DataProcessorInvocation graph,
ProcessingElementParameterExtractor extractor,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
- ConfiguredExternalEventProcessor<B> configuredEngine = onInvocation(graph, extractor);
- EventProcessorRuntimeParams<B> runtimeParams = new EventProcessorRuntimeParams<>
- (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+ ConfiguredExternalEventProcessor<T> configuredEngine = onInvocation(graph, extractor);
+ EventProcessorRuntimeParams<T> runtimeParams =
+ new EventProcessorRuntimeParams<>
+ (
+ configuredEngine.getBindingParams(),
+ false,
+ configExtractor,
+ streamPipesClient
+ );
return new StandaloneExternalEventProcessorRuntime<>(configuredEngine.getEngineSupplier(),
- runtimeParams);
+ runtimeParams);
}
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
index 2815c08ec..3464955c8 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneExternalEventSinkDeclarer.java
@@ -27,24 +27,35 @@ import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventSink;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneExternalEventSinkRuntime;
-public abstract class StandaloneExternalEventSinkDeclarer<B extends
- EventSinkBindingParams> extends EventSinkDeclarer<B,
- StandaloneExternalEventSinkRuntime<B>> {
+/**
+ * @deprecated since there is no usage
+ * @param <T>
+ */
+@Deprecated(since = "0.90.0", forRemoval = true)
+public abstract class StandaloneExternalEventSinkDeclarer<T extends
+ EventSinkBindingParams> extends EventSinkDeclarer<T,
+ StandaloneExternalEventSinkRuntime<T>> {
@Override
- public StandaloneExternalEventSinkRuntime<B> getRuntime(DataSinkInvocation graph,
+ public StandaloneExternalEventSinkRuntime<T> getRuntime(DataSinkInvocation graph,
DataSinkParameterExtractor extractor,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
- ConfiguredExternalEventSink<B> configuredEngine = onInvocation(graph, extractor);
- EventSinkRuntimeParams<B> runtimeParams = new EventSinkRuntimeParams<>
- (configuredEngine.getBindingParams(), false, configExtractor, streamPipesClient);
+ ConfiguredExternalEventSink<T> configuredEngine = onInvocation(graph, extractor);
+ EventSinkRuntimeParams<T> runtimeParams =
+ new EventSinkRuntimeParams<>
+ (
+ configuredEngine.getBindingParams(),
+ false,
+ configExtractor,
+ streamPipesClient
+ );
return new StandaloneExternalEventSinkRuntime<>(configuredEngine.getEngineSupplier(),
- runtimeParams);
+ runtimeParams);
}
- public abstract ConfiguredExternalEventSink<B> onInvocation(DataSinkInvocation graph,
+ public abstract ConfiguredExternalEventSink<T> onInvocation(DataSinkInvocation graph,
DataSinkParameterExtractor extractor);
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index d9e9aaa50..9373aed89 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -35,6 +35,7 @@ import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.wrapper.routing.RawDataProcessor;
import org.apache.streampipes.wrapper.routing.SpInputCollector;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,9 +48,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclarer, RawDataProcessor {
private static final Logger LOG = LoggerFactory.getLogger(StreamPipesFunction.class);
- private Map<String, SpInputCollector> inputCollectors;
private final Map<String, SourceInfo> sourceInfoMapper;
private final Map<String, SchemaInfo> schemaInfoMapper;
+ private Map<String, SpInputCollector> inputCollectors;
public StreamPipesFunction() {
this.sourceInfoMapper = new HashMap<>();
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
index 299be5780..19c9b1b79 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
@@ -18,31 +18,32 @@
package org.apache.streampipes.wrapper.standalone.manager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.wrapper.standalone.routing.StandaloneSpInputCollector;
import org.apache.streampipes.wrapper.standalone.routing.StandaloneSpOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.HashMap;
import java.util.Map;
public class ProtocolManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ProtocolManager.class);
public static Map<String, StandaloneSpInputCollector> consumers = new HashMap<>();
public static Map<String, StandaloneSpOutputCollector> producers = new HashMap<>();
- private static final Logger LOG = LoggerFactory.getLogger(ProtocolManager.class);
-
// TODO currently only the topic name is used as an identifier for a consumer/producer. Should
// be changed by some hashCode implementation in streampipes-model, but this requires changes
// in empire serializers
public static <T extends TransportProtocol> StandaloneSpInputCollector findInputCollector(T protocol,
TransportFormat format,
- Boolean singletonEngine) throws SpRuntimeException {
+ Boolean singletonEngine)
+ throws SpRuntimeException {
if (consumers.containsKey(topicName(protocol))) {
return consumers.get(topicName(protocol));
@@ -56,14 +57,15 @@ public class ProtocolManager {
public static <T extends TransportProtocol> StandaloneSpOutputCollector findOutputCollector(T protocol,
TransportFormat format,
- String resourceId) throws SpRuntimeException {
+ String resourceId)
+ throws SpRuntimeException {
if (producers.containsKey(topicName(protocol))) {
return producers.get(topicName(protocol));
} else {
producers.put(topicName(protocol), makeOutputCollector(protocol, format, resourceId));
LOG.info("Adding new producer to producer map (size=" + producers.size() + "): " + topicName
- (protocol));
+ (protocol));
return producers.get(topicName(protocol));
}
@@ -71,13 +73,15 @@ public class ProtocolManager {
private static <T extends TransportProtocol> StandaloneSpInputCollector<T> makeInputCollector(T protocol,
TransportFormat format,
- Boolean singletonEngine) throws SpRuntimeException {
+ Boolean singletonEngine)
+ throws SpRuntimeException {
return new StandaloneSpInputCollector<>(protocol, format, singletonEngine);
}
public static <T extends TransportProtocol> StandaloneSpOutputCollector<T> makeOutputCollector(T protocol,
TransportFormat format,
- String resourceId) throws SpRuntimeException {
+ String resourceId)
+ throws SpRuntimeException {
return new StandaloneSpOutputCollector<>(protocol, format, resourceId);
}
@@ -86,17 +90,17 @@ public class ProtocolManager {
}
public static <T extends TransportProtocol> void removeInputCollector(T protocol) throws
- SpRuntimeException {
+ SpRuntimeException {
consumers.remove(topicName(protocol));
LOG.info("Removing consumer from consumer map (size=" + consumers.size() + "): " + topicName
- (protocol));
+ (protocol));
}
public static <T extends TransportProtocol> void removeOutputCollector(T protocol) throws
- SpRuntimeException {
+ SpRuntimeException {
producers.remove(topicName(protocol));
LOG.info("Removing producer from producer map (size=" + producers.size() + "): " + topicName
- (protocol));
+ (protocol));
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
index cc7c73818..35dc44304 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpCollector.java
@@ -29,10 +29,10 @@ import org.apache.streampipes.wrapper.standalone.manager.PManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public abstract class StandaloneSpCollector<T extends TransportProtocol, C> implements
- PipelineElementCollector<C> {
+public abstract class StandaloneSpCollector<T extends TransportProtocol, W> implements
+ PipelineElementCollector<W> {
- protected Map<String, C> consumers;
+ protected Map<String, W> consumers;
protected T transportProtocol;
protected SpProtocolDefinition<T> protocolDefinition;
@@ -45,15 +45,15 @@ public abstract class StandaloneSpCollector<T extends TransportProtocol, C> impl
public StandaloneSpCollector(T protocol, TransportFormat format) throws SpRuntimeException {
this.transportProtocol = protocol;
this.protocolDefinition = PManager.getProtocolDefinition(protocol).orElseThrow(() -> new
- SpRuntimeException("Could not find protocol"));
+ SpRuntimeException("Could not find protocol"));
this.transportFormat = format;
this.dataFormatDefinition = PManager.getDataFormat(format).orElseThrow(() -> new
- SpRuntimeException("Could not find format"));
+ SpRuntimeException("Could not find format"));
this.consumers = new ConcurrentHashMap<>();
this.topic = transportProtocol.getTopicDefinition().getActualTopicName();
}
- public void registerConsumer(String routeId, C consumer) {
+ public void registerConsumer(String routeId, W consumer) {
consumers.put(routeId, consumer);
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
index 77277c6c1..eca3b78bd 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
@@ -27,9 +27,9 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
public class StandaloneSpInputCollector<T extends TransportProtocol> extends
- StandaloneSpCollector<T, RawDataProcessor>
- implements
- InternalEventProcessor<byte[]>, SpInputCollector {
+ StandaloneSpCollector<T, RawDataProcessor>
+ implements
+ InternalEventProcessor<byte[]>, SpInputCollector {
private final Boolean singletonEngine;
@@ -42,20 +42,20 @@ public class StandaloneSpInputCollector<T extends TransportProtocol> extends
@Override
public void onEvent(byte[] event) {
if (singletonEngine) {
- send(consumers.get(consumers.keySet().toArray()[0]), event);
+ send(consumers.get(consumers.keySet().toArray()[0]), event);
} else {
consumers.forEach((key, value) -> send(value, event));
}
}
private void send(RawDataProcessor rawDataProcessor, byte[] event) {
- rawDataProcessor.process(dataFormatDefinition.toMap(event), topic);
+ rawDataProcessor.process(dataFormatDefinition.toMap(event), topic);
}
@Override
public void connect() throws SpRuntimeException {
if (!protocolDefinition.getConsumer().isConnected()) {
- protocolDefinition.getConsumer().connect(transportProtocol,this);
+ protocolDefinition.getConsumer().connect(transportProtocol, this);
}
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
index d658e58a3..c6a55a3cc 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java
@@ -30,14 +30,15 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventConverter;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class StandaloneSpOutputCollector<T extends TransportProtocol> extends
- StandaloneSpCollector<T, InternalEventProcessor<Map<String,
- Object>>> implements SpOutputCollector {
+ StandaloneSpCollector<T, InternalEventProcessor<Map<String,
+ Object>>> implements SpOutputCollector {
private static final Logger LOG = LoggerFactory.getLogger(StandaloneSpOutputCollector.class);
@@ -47,9 +48,9 @@ public class StandaloneSpOutputCollector<T extends TransportProtocol> extends
public StandaloneSpOutputCollector(T protocol,
TransportFormat format,
String resourceId) throws SpRuntimeException {
- super(protocol, format);
- this.producer = protocolDefinition.getProducer();
- this.resourceId = resourceId;
+ super(protocol, format);
+ this.producer = protocolDefinition.getProducer();
+ this.resourceId = resourceId;
}
public void collect(Event event) {
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index 60763cdc5..4d0a686d1 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -27,22 +27,23 @@ import org.apache.streampipes.wrapper.routing.SpInputCollector;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.function.Supplier;
-public class StandaloneEventProcessorRuntime<B extends EventProcessorBindingParams> extends
- StandalonePipelineElementRuntime<B, DataProcessorInvocation,
- EventProcessorRuntimeParams<B>, EventProcessorRuntimeContext, EventProcessor<B>> {
+public class StandaloneEventProcessorRuntime<T extends EventProcessorBindingParams> extends
+ StandalonePipelineElementRuntime<T, DataProcessorInvocation,
+ EventProcessorRuntimeParams<T>, EventProcessorRuntimeContext, EventProcessor<T>> {
private static final Logger LOG = LoggerFactory.getLogger(StandaloneEventProcessorRuntime.class);
protected SpOutputCollector outputCollector;
- public StandaloneEventProcessorRuntime(Supplier<EventProcessor<B>> supplier,
- EventProcessorRuntimeParams<B> params) {
+ public StandaloneEventProcessorRuntime(Supplier<EventProcessor<T>> supplier,
+ EventProcessorRuntimeParams<T> params) {
super(supplier, params);
this.outputCollector = getOutputCollector();
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
index d706a2397..5d5156b2a 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
@@ -25,19 +25,20 @@ import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
import org.apache.streampipes.wrapper.routing.SpInputCollector;
import org.apache.streampipes.wrapper.runtime.EventSink;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.function.Supplier;
-public class StandaloneEventSinkRuntime<B extends EventSinkBindingParams> extends
- StandalonePipelineElementRuntime<B, DataSinkInvocation,
- EventSinkRuntimeParams<B>, EventSinkRuntimeContext, EventSink<B>> {
+public class StandaloneEventSinkRuntime<T extends EventSinkBindingParams> extends
+ StandalonePipelineElementRuntime<T, DataSinkInvocation,
+ EventSinkRuntimeParams<T>, EventSinkRuntimeContext, EventSink<T>> {
private static final Logger LOG = LoggerFactory.getLogger(StandaloneEventSinkRuntime.class);
- public StandaloneEventSinkRuntime(Supplier<EventSink<B>> supplier, EventSinkRuntimeParams<B>
+ public StandaloneEventSinkRuntime(Supplier<EventSink<T>> supplier, EventSinkRuntimeParams<T>
params) {
super(supplier, params);
}
@@ -55,7 +56,7 @@ public class StandaloneEventSinkRuntime<B extends EventSinkBindingParams> extend
monitoringManager.increaseInCounter(resourceId, sourceInfo, System.currentTimeMillis());
engine.onEvent(params.makeEvent(rawEvent, sourceInfo));
} catch (RuntimeException e) {
- LOG.error("RuntimeException while processing event in {}", engine.getClass().getCanonicalName() , e);
+ LOG.error("RuntimeException while processing event in {}", engine.getClass().getCanonicalName(), e);
addLogEntry(e);
}
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
index 01c01878d..5e0dd840f 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventProcessorRuntime.java
@@ -27,12 +27,12 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
import java.util.Map;
import java.util.function.Supplier;
-public class StandaloneExternalEventProcessorRuntime<B extends EventProcessorBindingParams> extends
- StandalonePipelineElementRuntime<B, DataProcessorInvocation,
- EventProcessorRuntimeParams<B>, EventProcessorRuntimeContext, ExternalEventProcessor<B>> {
+public class StandaloneExternalEventProcessorRuntime<T extends EventProcessorBindingParams> extends
+ StandalonePipelineElementRuntime<T, DataProcessorInvocation,
+ EventProcessorRuntimeParams<T>, EventProcessorRuntimeContext, ExternalEventProcessor<T>> {
- public StandaloneExternalEventProcessorRuntime(Supplier<ExternalEventProcessor<B>> supplier,
- EventProcessorRuntimeParams<B> runtimeParams) {
+ public StandaloneExternalEventProcessorRuntime(Supplier<ExternalEventProcessor<T>> supplier,
+ EventProcessorRuntimeParams<T> runtimeParams) {
super(supplier, runtimeParams);
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
index 418702da1..de5f4c881 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneExternalEventSinkRuntime.java
@@ -27,11 +27,12 @@ import org.apache.streampipes.wrapper.runtime.ExternalEventSink;
import java.util.Map;
import java.util.function.Supplier;
-public class StandaloneExternalEventSinkRuntime<B extends EventSinkBindingParams> extends
- StandalonePipelineElementRuntime<B, DataSinkInvocation,
- EventSinkRuntimeParams<B>, EventSinkRuntimeContext, ExternalEventSink<B>> {
+public class StandaloneExternalEventSinkRuntime<T extends EventSinkBindingParams> extends
+ StandalonePipelineElementRuntime<T, DataSinkInvocation,
+ EventSinkRuntimeParams<T>, EventSinkRuntimeContext, ExternalEventSink<T>> {
- public StandaloneExternalEventSinkRuntime(Supplier<ExternalEventSink<B>> supplier, EventSinkRuntimeParams<B> runtimeParams) {
+ public StandaloneExternalEventSinkRuntime(Supplier<ExternalEventSink<T>> supplier,
+ EventSinkRuntimeParams<T> runtimeParams) {
super(supplier, runtimeParams);
}
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
index 4a046f33e..eab751d27 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java
@@ -36,19 +36,19 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
-public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I>,
- I extends InvocableStreamPipesEntity,
- RP extends RuntimeParams<B, I, RC>,
- RC extends RuntimeContext,
- P extends PipelineElement<B, I>>
- extends PipelineElementRuntime implements RawDataProcessor {
+public abstract class StandalonePipelineElementRuntime<T extends BindingParams<K>,
+ K extends InvocableStreamPipesEntity,
+ V extends RuntimeParams<T, K, X>,
+ X extends RuntimeContext,
+ PeT extends PipelineElement<T, K>>
+ extends PipelineElementRuntime implements RawDataProcessor {
- protected RP params;
- protected final P engine;
+ protected final PeT engine;
+ protected V params;
protected SpMonitoringManager monitoringManager;
protected String resourceId;
- public StandalonePipelineElementRuntime(Supplier<P> supplier, RP runtimeParams) {
+ public StandalonePipelineElementRuntime(Supplier<PeT> supplier, V runtimeParams) {
super();
this.engine = supplier.get();
this.params = runtimeParams;
@@ -56,7 +56,7 @@ public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I
this.resourceId = params.getBindingParams().getGraph().getElementId();
}
- public P getEngine() {
+ public PeT getEngine() {
return engine;
}
@@ -69,8 +69,8 @@ public abstract class StandalonePipelineElementRuntime<B extends BindingParams<I
List<SpInputCollector> inputCollectors = new ArrayList<>();
for (SpDataStream is : params.getBindingParams().getGraph().getInputStreams()) {
inputCollectors.add(ProtocolManager.findInputCollector(is.getEventGrounding()
- .getTransportProtocol(), is.getEventGrounding().getTransportFormats().get(0),
- params.isSingletonEngine()));
+ .getTransportProtocol(), is.getEventGrounding().getTransportFormats().get(0),
+ params.isSingletonEngine()));
}
return inputCollectors;
}