You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/28 10:00:06 UTC
[incubator-streampipes] 01/04: [STREAMPIPES-487] Refactor Flink init classes to use service builder pattern
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit a6d78cb0f20a3ed23eb1fdce64a8f26de349d23f
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Dec 22 23:06:40 2021 +0100
[STREAMPIPES-487] Refactor Flink init classes to use service builder pattern
---
streampipes-extensions/pom.xml | 6 ++
.../pom.xml | 1 +
.../pe/flink/AllFlinkPipelineElementsInit.java | 108 +++++++--------------
.../pom.xml | 15 +++
.../aggregation/flink/AggregationFlinkInit.java | 44 +++++----
.../enricher/flink/EnricherFlinkInit.java | 46 +++++----
.../processor/geo/flink/GeoFlinkInit.java | 38 +++++---
.../detection/flink/PatternDetectionFlinkInit.java | 43 ++++----
.../statistics/flink/StatisticsFlinkInit.java | 40 ++++----
.../flink/TransformationFlinkInit.java | 47 +++++----
.../sinks/databases/flink/DatabasesFlinkInit.java | 37 ++++---
11 files changed, 224 insertions(+), 201 deletions(-)
diff --git a/streampipes-extensions/pom.xml b/streampipes-extensions/pom.xml
index da1e083..4a3c99f 100644
--- a/streampipes-extensions/pom.xml
+++ b/streampipes-extensions/pom.xml
@@ -549,6 +549,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml b/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
index 887f7ef..2ff8f1b 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
+++ b/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
@@ -32,6 +32,7 @@
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-processors-aggregation-flink</artifactId>
<version>0.69.0-SNAPSHOT</version>
+ <classifier>embed</classifier>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
diff --git a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java b/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
index 59be7c4..3edfc2d 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
@@ -17,7 +17,8 @@
*/
package org.apache.streampipes.pe.flink;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -26,82 +27,45 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.pe.flink.config.Config;
-import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
-import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
-import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
-import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
-import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventRateController;
-import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
-import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
-import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
-import org.apache.streampipes.processors.enricher.flink.processor.trigonometry.TrigonometryController;
-import org.apache.streampipes.processors.enricher.flink.processor.urldereferencing.UrlDereferencingController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.sequence.SequenceController;
-import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
-import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
-import org.apache.streampipes.processors.textmining.flink.processor.wordcount.WordCountController;
-import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
-import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
-import org.apache.streampipes.processors.transformation.flink.processor.mapper.FieldMapperController;
-import org.apache.streampipes.processors.transformation.flink.processor.measurementUnitConverter.MeasurementUnitConverterController;
-import org.apache.streampipes.processors.transformation.flink.processor.rename.FieldRenamerController;
-import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
+import org.apache.streampipes.processor.geo.flink.GeoFlinkInit;
+import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
+import org.apache.streampipes.processors.enricher.flink.EnricherFlinkInit;
+import org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit;
+import org.apache.streampipes.processors.statistics.flink.StatisticsFlinkInit;
+import org.apache.streampipes.processors.textmining.flink.TextMiningFlinkInit;
+import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
+import org.apache.streampipes.sinks.databases.flink.DatabasesFlinkInit;
public class AllFlinkPipelineElementsInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- // streampipes-processors-aggregation-flink
- .add(new AggregationController())
- .add(new CountController())
- .add(new EventRateController())
- .add(new EventCountController())
- // streampipes-processors-enricher-flink
- .add(new TimestampController())
- .add(new MathOpController())
- .add(new StaticMathOpController())
- .add(new UrlDereferencingController())
- .add(new TrigonometryController())
- // streampipes-processors-geo-flink
- .add(new SpatialGridEnrichmentController())
- // streampipes-processors-pattern-detection-flink
- .add(new PeakDetectionController())
- .add(new SequenceController())
- .add(new AbsenceController())
- .add(new AndController())
- // streampipes-processors-statistics-flink
- .add(new StatisticsSummaryController())
- .add(new StatisticsSummaryControllerWindow())
- // streampipes-processors-text-mining-flink
- .add(new WordCountController())
- // streampipes-processors-transformation-flink
- .add(new FieldConverterController())
- .add(new FieldHasherController())
- .add(new FieldMapperController())
- .add(new MeasurementUnitConverterController())
- .add(new FieldRenamerController())
- .add(new BoilerplateController())
- // streampipes-sinks-databases-flink
- .add(new ElasticSearchController());
-
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
-
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
+ new AllFlinkPipelineElementsInit().init();
+ }
- new AllFlinkPipelineElementsInit().init(Config.INSTANCE);
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("org-apache-streampipes-pe-all-flink",
+ "StreamPipes Bundled Pipeline Elements for JVM Wrapper",
+ "",
+ 8090)
+ .merge(new AggregationFlinkInit().provideServiceDefinition())
+ .merge(new EnricherFlinkInit().provideServiceDefinition())
+ .merge(new GeoFlinkInit().provideServiceDefinition())
+ .merge(new PatternDetectionFlinkInit().provideServiceDefinition())
+ .merge(new StatisticsFlinkInit().provideServiceDefinition())
+ .merge(new TextMiningFlinkInit().provideServiceDefinition())
+ .merge(new TransformationFlinkInit().provideServiceDefinition())
+ .merge(new DatabasesFlinkInit().provideServiceDefinition())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .build();
}
}
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml b/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
index 94d88c5..ce4ce40 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
@@ -107,6 +107,21 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <classifier>embed</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
index 4a41778..f276485 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.processors.aggregation.flink;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
@@ -36,24 +36,28 @@ import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventR
public class AggregationFlinkInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new AggregationController())
- .add(new CountController())
- .add(new EventRateController())
- .add(new EventCountController());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
-
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
-
- new AggregationFlinkInit().init(AggregationFlinkConfig.INSTANCE);
+ new AggregationFlinkInit().init();
}
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.aggregation.flink",
+ "Processors Aggregation Flink",
+ "",
+ 8090)
+ .registerPipelineElements(new AggregationController(),
+ new CountController(),
+ new EventRateController(),
+ new EventCountController())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .build();
+ }
}
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
index 093f6f4..eb6b46a 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.processors.enricher.flink;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
@@ -37,25 +37,29 @@ import org.apache.streampipes.processors.enricher.flink.processor.urldereferenci
public class EnricherFlinkInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new TimestampController())
- .add(new MathOpController())
- .add(new StaticMathOpController())
- .add(new UrlDereferencingController())
- .add(new TrigonometryController());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
-
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
-
- new EnricherFlinkInit().init(EnricherFlinkConfig.INSTANCE);
+ new EnricherFlinkInit().init();
}
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.enricher.flink",
+ "Processors Enricher Flink",
+ "",
+ 8090)
+ .registerPipelineElements(new TimestampController(),
+ new MathOpController(),
+ new StaticMathOpController(),
+ new UrlDereferencingController(),
+ new TrigonometryController())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .build();
+ }
}
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
index f531145..3e5a8ed 100644
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.processor.geo.flink;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,26 +28,31 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processor.geo.flink.config.GeoFlinkConfig;
import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
public class GeoFlinkInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new SpatialGridEnrichmentController());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
-
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
+ new GeoFlinkInit().init();
+ }
- new GeoFlinkInit().init(GeoFlinkConfig.INSTANCE);
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.geo.flink",
+ "Processors Geo Flink",
+ "",
+ 8090)
+ .registerPipelineElements(new SpatialGridEnrichmentController())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .build();
}
+
}
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
index 8c5f0a3..1fbb189 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.processors.pattern.detection.flink;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.pattern.detection.flink.config.PatternDetectionFlinkConfig;
import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
@@ -36,22 +36,29 @@ import org.apache.streampipes.processors.pattern.detection.flink.processor.seque
public class PatternDetectionFlinkInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new PeakDetectionController())
- .add(new SequenceController())
- .add(new AbsenceController())
- .add(new AndController());
-
- DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
-
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
+ new PatternDetectionFlinkInit().init();
+ }
- new PatternDetectionFlinkInit().init(PatternDetectionFlinkConfig.INSTANCE);
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.patterndetection.flink",
+ "Processors Pattern Detection Flink",
+ "",
+ 8090)
+ .registerPipelineElements(new PeakDetectionController(),
+ new SequenceController(),
+ new AbsenceController(),
+ new AndController())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .build();
}
+
}
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
index 5abf0b9..8ac42b8 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.processors.statistics.flink;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,28 +28,33 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.statistics.flink.config.StatisticsFlinkConfig;
import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
public class StatisticsFlinkInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new StatisticsSummaryController())
- .add(new StatisticsSummaryControllerWindow());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
-
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
+ new StatisticsFlinkInit().init();
+ }
- new StatisticsFlinkInit().init(StatisticsFlinkConfig.INSTANCE);
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.statistics.flink",
+ "Processors Statistics Flink",
+ "",
+ 8090)
+ .registerPipelineElements(new StatisticsSummaryController(),
+ new StatisticsSummaryControllerWindow())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .build();
}
+
}
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
index e3e0282..4a1a778 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.processors.transformation.flink;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.transformation.flink.config.TransformationFlinkConfig;
import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
@@ -38,25 +38,30 @@ import org.apache.streampipes.processors.transformation.flink.processor.rename.F
public class TransformationFlinkInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new FieldConverterController())
- .add(new FieldHasherController())
- .add(new FieldMapperController())
- .add(new MeasurementUnitConverterController())
- .add(new FieldRenamerController())
- .add(new BoilerplateController());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
-
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
+ new TransformationFlinkInit().init();
+ }
- new TransformationFlinkInit().init(TransformationFlinkConfig.INSTANCE);
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.transformation.flink",
+ "Processors Transformation Flink",
+ "",
+ 8090)
+ .registerPipelineElements(new FieldConverterController(),
+ new FieldHasherController(),
+ new FieldMapperController(),
+ new MeasurementUnitConverterController(),
+ new FieldRenamerController(),
+ new BoilerplateController())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .build();
}
}
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
index f7e3367..6c8c90c 100644
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
+++ b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.sinks.databases.flink;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,26 +28,30 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.sinks.databases.flink.config.DatabasesFlinkConfig;
import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
public class DatabasesFlinkInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new ElasticSearchController());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
+ new DatabasesFlinkInit().init();
+ }
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
-
- new DatabasesFlinkInit().init(DatabasesFlinkConfig.INSTANCE);
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("org.apache.streampipes.sinks.databases.flink",
+ "Sinks Databases Flink",
+ "",
+ 8090)
+ .registerPipelineElements(new ElasticSearchController())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .build();
}
}