You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/04 21:58:06 UTC
[incubator-streampipes] branch STREAMPIPES-252 updated:
[STREAMPIPES-252] add SP_PRIORITIZED_PROTOCOL env variable
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch STREAMPIPES-252
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-252 by this push:
new 4d3cdcd [STREAMPIPES-252] add SP_PRIORITIZED_PROTOCOL env variable
4d3cdcd is described below
commit 4d3cdcd59a9b489eaf8921aae6b9e555c411b007
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Nov 4 22:57:50 2020 +0100
[STREAMPIPES-252] add SP_PRIORITIZED_PROTOCOL env variable
---
.../config/backend/BackendConfigKeys.java | 2 ++
.../config/backend/MessagingSettings.java | 22 +++++++++++++++++++++-
.../AbstractNotificationSubscriber.java | 2 +-
3 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
index 38b5a2d..257427a 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
@@ -49,4 +49,6 @@ public class BackendConfigKeys {
public static final String SERVICE_NAME = "SP_SERVICE_NAME";
+
+ public static final String PRIORITIZED_PROTOCOL = "SP_PRIORITIZED_PROTOCOL";
}
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index ec136e4..54817ce 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.config.backend;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -31,10 +32,29 @@ public class MessagingSettings {
private List<SpProtocol> prioritizedProtocols;
public static MessagingSettings fromDefault() {
+ List<SpProtocol> protocolList;
+ if (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL) != null) {
+ switch (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL).toLowerCase()) {
+ case "mqtt":
+ protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
+ break;
+ case "kafka":
+ protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+ break;
+ case "jms":
+ protocolList = Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT);
+ break;
+ default:
+ protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+ }
+ } else {
+ protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+ }
+
return new MessagingSettings(
1638400, 5000012, 20, 2,
Arrays.asList(SpDataFormat.JSON, SpDataFormat.CBOR, SpDataFormat.FST, SpDataFormat.SMILE),
- Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS));
+ protocolList);
}
public MessagingSettings(Integer batchSize, Integer messageMaxBytes, Integer lingerMs,
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/AbstractNotificationSubscriber.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/AbstractNotificationSubscriber.java
index 71c72ca..66bae64 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/AbstractNotificationSubscriber.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/notifications/AbstractNotificationSubscriber.java
@@ -48,7 +48,7 @@ public abstract class AbstractNotificationSubscriber implements InternalEventPro
private JmsTransportProtocol getConsumerSettings() {
JmsTransportProtocol protocol = new JmsTransportProtocol();
protocol.setPort(BackendConfig.INSTANCE.getJmsPort());
- protocol.setBrokerHostname("tcp://" +BackendConfig.INSTANCE.getJmsHost());
+ protocol.setBrokerHostname(BackendConfig.INSTANCE.getJmsHost());
protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
return protocol;