You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/05/01 19:12:29 UTC

[incubator-streampipes] 01/01: [STREAMPIPES-527] Use MQTT as default protocol in config, fix bug in preview

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-527
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit e7b4e499ceecdabbf04503ed50135886d4b060cf
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun May 1 21:12:13 2022 +0200

    [STREAMPIPES-527] Use MQTT as default protocol in config, fix bug in preview
---
 .idea/runConfigurations/backend.xml                |  8 ++---
 .../standalone/backend/docker-compose.dev.yml      |  2 ++
 .../deploy/standalone/backend/docker-compose.yml   |  2 +-
 .../standalone/mosquitto/docker-compose.dev.yml    |  2 +-
 .../deploy/standalone/mosquitto/docker-compose.yml |  3 +-
 .../streampipes/config/backend/BackendConfig.java  |  2 +-
 .../config/backend/MessagingSettings.java          |  7 ++--
 .../dataexplorer/DataLakeManagementV4.java         | 18 +++++-----
 .../messaging/mqtt/AbstractMqttConnector.java      |  2 ++
 .../runtime/PipelineElementRuntimeInfoFetcher.java | 42 +++++++++-------------
 10 files changed, 41 insertions(+), 47 deletions(-)

diff --git a/.idea/runConfigurations/backend.xml b/.idea/runConfigurations/backend.xml
index ff3c77297..fb5908bab 100644
--- a/.idea/runConfigurations/backend.xml
+++ b/.idea/runConfigurations/backend.xml
@@ -2,7 +2,7 @@
   <configuration default="false" name="backend" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot">
     <module name="streampipes-backend" />
     <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.backend.StreamPipesBackendApplication" />
-    <option name="ALTERNATIVE_JRE_PATH" />
+    <option name="ALTERNATIVE_JRE_PATH" value="11" />
     <option name="SHORTEN_COMMAND_LINE" value="NONE" />
     <envs>
       <env name="SP_COUCHDB_HOST" value="localhost" />
@@ -12,11 +12,11 @@
       <env name="SP_INFLUX_PORT" value="8086" />
       <env name="SP_KAFKA_PORT" value="9094" />
       <env name="SP_JMS_HOST" value="localhost" />
-      <env name="SP_DEBUG" value="true " />
-      <env name="SP_PRIORITIZED_PROTOCOL" value="kafka" />
+      <env name="SP_DEBUG" value="true" />
+      <env name="SP_PRIORITIZED_PROTOCOL" value="mqtt" />
     </envs>
     <method v="2">
       <option name="Make" enabled="true" />
     </method>
   </configuration>
-</component>
\ No newline at end of file
+</component>
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
index 787111886..c67c3a78a 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
@@ -18,3 +18,5 @@ services:
   backend:
     ports:
       - "8030:8030"
+    environment:
+      - SP_HOST=host.docker.internal
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.yml b/installer/cli/deploy/standalone/backend/docker-compose.yml
index 4d014eb27..c67ba450e 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.yml
@@ -24,7 +24,7 @@ services:
       - backend:/root/.streampipes
       - files:/spImages
     environment:
-      - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-kafka}
+      - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-mqtt}
       - SP_MQTT_HOST=${SP_MQTT_HOST:-activemq}
     logging:
       driver: "json-file"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
index ae347c1c3..711570b8b 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
@@ -17,4 +17,4 @@ version: "3.4"
 services:
   mosquitto:
     ports:
-      - "1884:1883"
+      - "1883:1883"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
index 5e5b07af2..c936edf17 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
@@ -16,7 +16,8 @@
 version: "3.4"
 services:
   mosquitto:
-    image: eclipse-mosquitto:1.5.4
+    image: eclipse-mosquitto:2.0.14
+    command: mosquitto -c /mosquitto-no-auth.conf
     logging:
       driver: "json-file"
       options:
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index 403ac4d66..eb713c8b4 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -47,7 +47,7 @@ public enum BackendConfig {
 
     config.register(BackendConfigKeys.JMS_HOST, "activemq", "Hostname for backend service for active mq");
     config.register(BackendConfigKeys.JMS_PORT, 61616, "Port for backend service for active mq");
-    config.register(BackendConfigKeys.MQTT_HOST, "activemq", "Hostname of mqtt service");
+    config.register(BackendConfigKeys.MQTT_HOST, "mosquitto", "Hostname of mqtt service");
     config.register(BackendConfigKeys.MQTT_PORT, 1883, "Port of mqtt service");
     config.register(BackendConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka");
     config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index 6b49a0e05..2c37e6f44 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -34,9 +34,6 @@ public class MessagingSettings {
     List<SpProtocol> protocolList;
     if (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL) != null) {
       switch (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL).toLowerCase()) {
-        case "mqtt":
-          protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
-          break;
         case "kafka":
           protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
           break;
@@ -44,10 +41,10 @@ public class MessagingSettings {
           protocolList = Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT);
           break;
         default:
-          protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+          protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
       }
     } else {
-      protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+      protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
     }
 
     return new MessagingSettings(
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 1b72bcb93..b5c68df45 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -312,15 +312,17 @@ public class DataLakeManagementV4 {
                 String q = "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" +measurementId + "\" WITH KEY = \"" +f + "\"";
                 Query query = new Query(q);
                 QueryResult queryResult = influxDB.query(query);
-                queryResult.getResults().forEach(res -> {
-                    res.getSeries().forEach(series -> {
-                        if (series.getValues().size() > 0) {
-                            String field = series.getValues().get(0).get(0).toString();
-                            List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
-                            tags.put(field, values);
-                        }
+                if (queryResult.getResults() != null) {
+                    queryResult.getResults().forEach(res -> {
+                        res.getSeries().forEach(series -> {
+                            if (series.getValues().size() > 0) {
+                                String field = series.getValues().get(0).get(0).toString();
+                                List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+                                tags.put(field, values);
+                            }
+                        });
                     });
-                });
+                }
         });
 
         return tags;
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
index 6ba637a9b..dcdfbe789 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -29,6 +29,8 @@ public class AbstractMqttConnector {
   protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
     this.mqtt = new MQTT();
     this.mqtt.setHost(makeBrokerUrl(protocolSettings));
+    this.mqtt.setConnectAttemptsMax(3);
+    this.mqtt.setReconnectDelay(1000);
     this.connection = mqtt.blockingConnection();
     this.connection.connect();
     this.connected = true;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index e7fac0ba0..d1247a15b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.manager.runtime;
 
+import org.apache.streampipes.commons.constants.Envs;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
@@ -27,8 +28,6 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -36,8 +35,6 @@ import java.util.Map;
 public enum PipelineElementRuntimeInfoFetcher {
   INSTANCE;
 
-  Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
-
   private final int FETCH_INTERVAL_MS = 300;
   private final Map<String, SpDataFormatConverter> converterMap;
 
@@ -49,8 +46,7 @@ public enum PipelineElementRuntimeInfoFetcher {
 
     if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) {
       return getLatestEventFromKafka(spDataStream);
-    }
-    else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol){
+    } else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol) {
       return getLatestEventFromJms(spDataStream);
     } else {
       return getLatestEventFromMqtt(spDataStream);
@@ -75,7 +71,7 @@ public enum PipelineElementRuntimeInfoFetcher {
     JmsTransportProtocol protocol = (JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change jms config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
     }
     if (!converterMap.containsKey(jmsTopic)) {
@@ -113,7 +109,7 @@ public enum PipelineElementRuntimeInfoFetcher {
     MqttTransportProtocol protocol = (MqttTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change mqtt config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
     }
 
@@ -122,15 +118,12 @@ public enum PipelineElementRuntimeInfoFetcher {
               new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
     }
     MqttConsumer mqttConsumer = new MqttConsumer();
-    mqttConsumer.connect(protocol, new InternalEventProcessor<byte[]>() {
-      @Override
-      public void onEvent(byte[] event) {
-        try {
-          result[0] = converterMap.get(mqttTopic).convert(event);
-          mqttConsumer.disconnect();
-        } catch (SpRuntimeException e) {
-          e.printStackTrace();
-        }
+    mqttConsumer.connect(protocol, event -> {
+      try {
+        result[0] = converterMap.get(mqttTopic).convert(event);
+        mqttConsumer.disconnect();
+      } catch (SpRuntimeException e) {
+        e.printStackTrace();
       }
     });
 
@@ -151,7 +144,7 @@ public enum PipelineElementRuntimeInfoFetcher {
     KafkaTransportProtocol protocol = (KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change kafka config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
       protocol.setKafkaPort(9094);
     }
@@ -161,14 +154,11 @@ public enum PipelineElementRuntimeInfoFetcher {
               new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
     }
 
-    SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, new InternalEventProcessor<byte[]>() {
-      @Override
-      public void onEvent(byte[] event) {
-        try {
-          result[0] = converterMap.get(kafkaTopic).convert(event);
-        } catch (SpRuntimeException e) {
-          e.printStackTrace();
-        }
+    SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, event -> {
+      try {
+        result[0] = converterMap.get(kafkaTopic).convert(event);
+      } catch (SpRuntimeException e) {
+        e.printStackTrace();
       }
     });