You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/28 10:00:06 UTC

[incubator-streampipes] 01/04: [STREAMPIPES-487] Refactor Flink init classes to use service builder pattern

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit a6d78cb0f20a3ed23eb1fdce64a8f26de349d23f
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Dec 22 23:06:40 2021 +0100

    [STREAMPIPES-487] Refactor Flink init classes to use service builder pattern
---
 streampipes-extensions/pom.xml                     |   6 ++
 .../pom.xml                                        |   1 +
 .../pe/flink/AllFlinkPipelineElementsInit.java     | 108 +++++++--------------
 .../pom.xml                                        |  15 +++
 .../aggregation/flink/AggregationFlinkInit.java    |  44 +++++----
 .../enricher/flink/EnricherFlinkInit.java          |  46 +++++----
 .../processor/geo/flink/GeoFlinkInit.java          |  38 +++++---
 .../detection/flink/PatternDetectionFlinkInit.java |  43 ++++----
 .../statistics/flink/StatisticsFlinkInit.java      |  40 ++++----
 .../flink/TransformationFlinkInit.java             |  47 +++++----
 .../sinks/databases/flink/DatabasesFlinkInit.java  |  37 ++++---
 11 files changed, 224 insertions(+), 201 deletions(-)

diff --git a/streampipes-extensions/pom.xml b/streampipes-extensions/pom.xml
index da1e083..4a3c99f 100644
--- a/streampipes-extensions/pom.xml
+++ b/streampipes-extensions/pom.xml
@@ -549,6 +549,12 @@
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-test-utils_2.11</artifactId>
                 <version>${flink.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-slf4j-impl</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
diff --git a/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml b/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
index 887f7ef..2ff8f1b 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
+++ b/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
@@ -32,6 +32,7 @@
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-processors-aggregation-flink</artifactId>
             <version>0.69.0-SNAPSHOT</version>
+            <classifier>embed</classifier>
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
diff --git a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java b/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
index 59be7c4..3edfc2d 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
@@ -17,7 +17,8 @@
  */
 package org.apache.streampipes.pe.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -26,82 +27,45 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.pe.flink.config.Config;
-import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
-import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
-import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
-import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
-import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventRateController;
-import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
-import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
-import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
-import org.apache.streampipes.processors.enricher.flink.processor.trigonometry.TrigonometryController;
-import org.apache.streampipes.processors.enricher.flink.processor.urldereferencing.UrlDereferencingController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.sequence.SequenceController;
-import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
-import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
-import org.apache.streampipes.processors.textmining.flink.processor.wordcount.WordCountController;
-import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
-import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
-import org.apache.streampipes.processors.transformation.flink.processor.mapper.FieldMapperController;
-import org.apache.streampipes.processors.transformation.flink.processor.measurementUnitConverter.MeasurementUnitConverterController;
-import org.apache.streampipes.processors.transformation.flink.processor.rename.FieldRenamerController;
-import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
+import org.apache.streampipes.processor.geo.flink.GeoFlinkInit;
+import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
+import org.apache.streampipes.processors.enricher.flink.EnricherFlinkInit;
+import org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit;
+import org.apache.streampipes.processors.statistics.flink.StatisticsFlinkInit;
+import org.apache.streampipes.processors.textmining.flink.TextMiningFlinkInit;
+import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
+import org.apache.streampipes.sinks.databases.flink.DatabasesFlinkInit;
 
 
 public class AllFlinkPipelineElementsInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            // streampipes-processors-aggregation-flink
-            .add(new AggregationController())
-            .add(new CountController())
-            .add(new EventRateController())
-            .add(new EventCountController())
-            // streampipes-processors-enricher-flink
-            .add(new TimestampController())
-            .add(new MathOpController())
-            .add(new StaticMathOpController())
-            .add(new UrlDereferencingController())
-            .add(new TrigonometryController())
-            // streampipes-processors-geo-flink
-            .add(new SpatialGridEnrichmentController())
-            // streampipes-processors-pattern-detection-flink
-            .add(new PeakDetectionController())
-            .add(new SequenceController())
-            .add(new AbsenceController())
-            .add(new AndController())
-            // streampipes-processors-statistics-flink
-            .add(new StatisticsSummaryController())
-            .add(new StatisticsSummaryControllerWindow())
-            // streampipes-processors-text-mining-flink
-            .add(new WordCountController())
-            // streampipes-processors-transformation-flink
-            .add(new FieldConverterController())
-            .add(new FieldHasherController())
-            .add(new FieldMapperController())
-            .add(new MeasurementUnitConverterController())
-            .add(new FieldRenamerController())
-            .add(new BoilerplateController())
-            // streampipes-sinks-databases-flink
-            .add(new ElasticSearchController());
-
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new AllFlinkPipelineElementsInit().init();
+  }
 
-    new AllFlinkPipelineElementsInit().init(Config.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org-apache-streampipes-pe-all-flink",
+                    "StreamPipes Bundled Pipeline Elements for JVM Wrapper",
+                    "",
+                    8090)
+            .merge(new AggregationFlinkInit().provideServiceDefinition())
+            .merge(new EnricherFlinkInit().provideServiceDefinition())
+            .merge(new GeoFlinkInit().provideServiceDefinition())
+            .merge(new PatternDetectionFlinkInit().provideServiceDefinition())
+            .merge(new StatisticsFlinkInit().provideServiceDefinition())
+            .merge(new TextMiningFlinkInit().provideServiceDefinition())
+            .merge(new TransformationFlinkInit().provideServiceDefinition())
+            .merge(new DatabasesFlinkInit().provideServiceDefinition())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml b/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
index 94d88c5..ce4ce40 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
@@ -107,6 +107,21 @@
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <classifier>embed</classifier>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
index 4a41778..f276485 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.aggregation.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
 import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
 import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
 import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
@@ -36,24 +36,28 @@ import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventR
 public class AggregationFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new AggregationController())
-            .add(new CountController())
-            .add(new EventRateController())
-            .add(new EventCountController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
-
-    new AggregationFlinkInit().init(AggregationFlinkConfig.INSTANCE);
+    new AggregationFlinkInit().init();
   }
 
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.aggregation.flink",
+                    "Processors Aggregation Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new AggregationController(),
+                    new CountController(),
+                    new EventRateController(),
+                    new EventCountController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
+  }
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
index 093f6f4..eb6b46a 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.enricher.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
 import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
 import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
 import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
@@ -37,25 +37,29 @@ import org.apache.streampipes.processors.enricher.flink.processor.urldereferenci
 public class EnricherFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new TimestampController())
-            .add(new MathOpController())
-            .add(new StaticMathOpController())
-            .add(new UrlDereferencingController())
-            .add(new TrigonometryController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
-
-    new EnricherFlinkInit().init(EnricherFlinkConfig.INSTANCE);
+    new EnricherFlinkInit().init();
   }
 
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.enricher.flink",
+                    "Processors Enricher Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new TimestampController(),
+                    new MathOpController(),
+                    new StaticMathOpController(),
+                    new UrlDereferencingController(),
+                    new TrigonometryController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
+  }
 }
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
index f531145..3e5a8ed 100644
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processor.geo.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,26 +28,31 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processor.geo.flink.config.GeoFlinkConfig;
 import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
 
 public class GeoFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new SpatialGridEnrichmentController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new GeoFlinkInit().init();
+  }
 
-    new GeoFlinkInit().init(GeoFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.geo.flink",
+                    "Processors Geo Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new SpatialGridEnrichmentController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
+
 }
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
index 8c5f0a3..1fbb189 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.pattern.detection.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.pattern.detection.flink.config.PatternDetectionFlinkConfig;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
@@ -36,22 +36,29 @@ import org.apache.streampipes.processors.pattern.detection.flink.processor.seque
 public class PatternDetectionFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new PeakDetectionController())
-            .add(new SequenceController())
-            .add(new AbsenceController())
-            .add(new AndController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new PatternDetectionFlinkInit().init();
+  }
 
-    new PatternDetectionFlinkInit().init(PatternDetectionFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.patterndetection.flink",
+                    "Processors Pattern Detection Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new PeakDetectionController(),
+                    new SequenceController(),
+                    new AbsenceController(),
+                    new AndController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
+
 }
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
index 5abf0b9..8ac42b8 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.statistics.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,28 +28,33 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.statistics.flink.config.StatisticsFlinkConfig;
 import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
 import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
 
 public class StatisticsFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new StatisticsSummaryController())
-            .add(new StatisticsSummaryControllerWindow());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new StatisticsFlinkInit().init();
+  }
 
-    new StatisticsFlinkInit().init(StatisticsFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.statistics.flink",
+                    "Processors Statistics Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new StatisticsSummaryController(),
+                    new StatisticsSummaryControllerWindow())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
+
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
index e3e0282..4a1a778 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.transformation.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.transformation.flink.config.TransformationFlinkConfig;
 import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
 import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
 import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
@@ -38,25 +38,30 @@ import org.apache.streampipes.processors.transformation.flink.processor.rename.F
 public class TransformationFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new FieldConverterController())
-            .add(new FieldHasherController())
-            .add(new FieldMapperController())
-            .add(new MeasurementUnitConverterController())
-            .add(new FieldRenamerController())
-            .add(new BoilerplateController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new TransformationFlinkInit().init();
+  }
 
-    new TransformationFlinkInit().init(TransformationFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.transformation.flink",
+                    "Processors Transformation Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new FieldConverterController(),
+                    new FieldHasherController(),
+                    new FieldMapperController(),
+                    new MeasurementUnitConverterController(),
+                    new FieldRenamerController(),
+                    new BoilerplateController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
index f7e3367..6c8c90c 100644
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
+++ b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.sinks.databases.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,26 +28,30 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.sinks.databases.flink.config.DatabasesFlinkConfig;
 import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
 
 public class DatabasesFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new ElasticSearchController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
+    new DatabasesFlinkInit().init();
+  }
 
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
-    
-    new DatabasesFlinkInit().init(DatabasesFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.sinks.databases.flink",
+                    "Sinks Databases Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new ElasticSearchController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
 }