You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/04 21:58:06 UTC

[incubator-streampipes] branch STREAMPIPES-252 updated: [STREAMPIPES-252] add SP_PRIORITIZED_PROTOCOL env variable

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch STREAMPIPES-252
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-252 by this push:
     new 4d3cdcd  [STREAMPIPES-252] add SP_PRIORITIZED_PROTOCOL env variable
4d3cdcd is described below

commit 4d3cdcd59a9b489eaf8921aae6b9e555c411b007
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Nov 4 22:57:50 2020 +0100

    [STREAMPIPES-252] add SP_PRIORITIZED_PROTOCOL env variable
---
 .../config/backend/BackendConfigKeys.java          |  2 ++
 .../config/backend/MessagingSettings.java          | 22 +++++++++++++++++++++-
 .../AbstractNotificationSubscriber.java            |  2 +-
 3 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
index 38b5a2d..257427a 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
@@ -49,4 +49,6 @@ public class BackendConfigKeys {
 
 
   public static final String SERVICE_NAME = "SP_SERVICE_NAME";
+
+  public static final String PRIORITIZED_PROTOCOL = "SP_PRIORITIZED_PROTOCOL";
 }
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index ec136e4..54817ce 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.config.backend;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -31,10 +32,29 @@ public class MessagingSettings {
   private List<SpProtocol> prioritizedProtocols;
 
   public static MessagingSettings fromDefault() {
+    List<SpProtocol> protocolList;
+    if (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL) != null) {
+      switch (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL).toLowerCase()) {
+        case "mqtt":
+          protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
+          break;
+        case "kafka":
+          protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+          break;
+        case "jms":
+          protocolList = Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT);
+          break;
+        default:
+          protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+      }
+    } else {
+      protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+    }
+
     return new MessagingSettings(
             1638400, 5000012, 20, 2,
             Arrays.asList(SpDataFormat.JSON, SpDataFormat.CBOR, SpDataFormat.FST, SpDataFormat.SMILE),
-            Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS));
+            protocolList);
   }
 
   public MessagingSettings(Integer batchSize, Integer messageMaxBytes, Integer lingerMs,
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/AbstractNotificationSubscriber.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/AbstractNotificationSubscriber.java
index 71c72ca..66bae64 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/AbstractNotificationSubscriber.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/AbstractNotificationSubscriber.java
@@ -48,7 +48,7 @@ public abstract class AbstractNotificationSubscriber implements InternalEventPro
     private JmsTransportProtocol getConsumerSettings() {
         JmsTransportProtocol protocol = new JmsTransportProtocol();
         protocol.setPort(BackendConfig.INSTANCE.getJmsPort());
-        protocol.setBrokerHostname("tcp://" +BackendConfig.INSTANCE.getJmsHost());
+        protocol.setBrokerHostname(BackendConfig.INSTANCE.getJmsHost());
         protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
 
         return protocol;