You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/11 09:17:29 UTC

[incubator-streampipes-extensions] branch STREAMPIPES-319 updated: [STREAMPIPES-319] Refactor service definition

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/STREAMPIPES-319 by this push:
     new 1bd9622  [STREAMPIPES-319] Refactor service definition
1bd9622 is described below

commit 1bd9622d964c06969279359f6652524758c03fea
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue May 11 11:17:15 2021 +0200

    [STREAMPIPES-319] Refactor service definition
---
 .../sinks/brokers/jvm/BrokersJvmInit.java          | 44 ++++++++++------------
 streampipes-sinks-notifications-jvm/pom.xml        |  2 +-
 2 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index b27d8c1..e11b8b5 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.sinks.brokers.jvm;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -28,10 +29,8 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestController;
-import org.apache.streampipes.sinks.brokers.jvm.config.BrokersJvmConfig;
 import org.apache.streampipes.sinks.brokers.jvm.jms.JmsController;
 import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaController;
-import org.apache.streampipes.sinks.brokers.jvm.mqtt.MqttPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.nats.NatsController;
 import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarController;
 import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController;
@@ -41,28 +40,25 @@ import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
 public class BrokersJvmInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new KafkaController())
-            .add(new JmsController())
-            .add(new RestController())
-            .add(new BufferRestController())
-            .add(new RabbitMqController())
-            .add(new MqttPublisherSink())
-            .add(new WebsocketServerSink())
-            .add(new PulsarController())
-            .add(new NatsController());
 
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
+    SpServiceDefinition serviceDef = SpServiceDefinitionBuilder.create("org.apache.streampipes.sinks.notifications.jvm", "Sinks Notifications JVM", "", 8096)
+            .registerPipelineElements(new KafkaController(),
+                    new JmsController(),
+                    new RestController(),
+                    new BufferRestController(),
+                    new RabbitMqController(),
+                    new WebsocketServerSink(),
+                    new PulsarController(),
+                    new NatsController())
+            .registerMessagingFormats(new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
 
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
-
-    new BrokersJvmInit().init(BrokersJvmConfig.INSTANCE);
+    new BrokersJvmInit().init(serviceDef);
   }
 }
diff --git a/streampipes-sinks-notifications-jvm/pom.xml b/streampipes-sinks-notifications-jvm/pom.xml
index 1b6b29c..05f0367 100644
--- a/streampipes-sinks-notifications-jvm/pom.xml
+++ b/streampipes-sinks-notifications-jvm/pom.xml
@@ -121,4 +121,4 @@
         <finalName>streampipes-sinks-notifications-jvm</finalName>
 
     </build>
-</project>
\ No newline at end of file
+</project>