You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/05/01 19:12:28 UTC

[incubator-streampipes] branch STREAMPIPES-527 created (now e7b4e499c)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch STREAMPIPES-527
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


      at e7b4e499c [STREAMPIPES-527] Use MQTT as default protocol in config, fix bug in preview

This branch includes the following new commits:

     new e7b4e499c [STREAMPIPES-527] Use MQTT as default protocol in config, fix bug in preview

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampipes] 01/01: [STREAMPIPES-527] Use MQTT as default protocol in config, fix bug in preview

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-527
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit e7b4e499ceecdabbf04503ed50135886d4b060cf
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun May 1 21:12:13 2022 +0200

    [STREAMPIPES-527] Use MQTT as default protocol in config, fix bug in preview
---
 .idea/runConfigurations/backend.xml                |  8 ++---
 .../standalone/backend/docker-compose.dev.yml      |  2 ++
 .../deploy/standalone/backend/docker-compose.yml   |  2 +-
 .../standalone/mosquitto/docker-compose.dev.yml    |  2 +-
 .../deploy/standalone/mosquitto/docker-compose.yml |  3 +-
 .../streampipes/config/backend/BackendConfig.java  |  2 +-
 .../config/backend/MessagingSettings.java          |  7 ++--
 .../dataexplorer/DataLakeManagementV4.java         | 18 +++++-----
 .../messaging/mqtt/AbstractMqttConnector.java      |  2 ++
 .../runtime/PipelineElementRuntimeInfoFetcher.java | 42 +++++++++-------------
 10 files changed, 41 insertions(+), 47 deletions(-)

diff --git a/.idea/runConfigurations/backend.xml b/.idea/runConfigurations/backend.xml
index ff3c77297..fb5908bab 100644
--- a/.idea/runConfigurations/backend.xml
+++ b/.idea/runConfigurations/backend.xml
@@ -2,7 +2,7 @@
   <configuration default="false" name="backend" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot">
     <module name="streampipes-backend" />
     <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.backend.StreamPipesBackendApplication" />
-    <option name="ALTERNATIVE_JRE_PATH" />
+    <option name="ALTERNATIVE_JRE_PATH" value="11" />
     <option name="SHORTEN_COMMAND_LINE" value="NONE" />
     <envs>
       <env name="SP_COUCHDB_HOST" value="localhost" />
@@ -12,11 +12,11 @@
       <env name="SP_INFLUX_PORT" value="8086" />
       <env name="SP_KAFKA_PORT" value="9094" />
       <env name="SP_JMS_HOST" value="localhost" />
-      <env name="SP_DEBUG" value="true " />
-      <env name="SP_PRIORITIZED_PROTOCOL" value="kafka" />
+      <env name="SP_DEBUG" value="true" />
+      <env name="SP_PRIORITIZED_PROTOCOL" value="mqtt" />
     </envs>
     <method v="2">
       <option name="Make" enabled="true" />
     </method>
   </configuration>
-</component>
\ No newline at end of file
+</component>
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
index 787111886..c67c3a78a 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
@@ -18,3 +18,5 @@ services:
   backend:
     ports:
       - "8030:8030"
+    environment:
+      - SP_HOST=host.docker.internal
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.yml b/installer/cli/deploy/standalone/backend/docker-compose.yml
index 4d014eb27..c67ba450e 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.yml
@@ -24,7 +24,7 @@ services:
       - backend:/root/.streampipes
       - files:/spImages
     environment:
-      - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-kafka}
+      - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-mqtt}
       - SP_MQTT_HOST=${SP_MQTT_HOST:-activemq}
     logging:
       driver: "json-file"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
index ae347c1c3..711570b8b 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
@@ -17,4 +17,4 @@ version: "3.4"
 services:
   mosquitto:
     ports:
-      - "1884:1883"
+      - "1883:1883"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
index 5e5b07af2..c936edf17 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
@@ -16,7 +16,8 @@
 version: "3.4"
 services:
   mosquitto:
-    image: eclipse-mosquitto:1.5.4
+    image: eclipse-mosquitto:2.0.14
+    command: mosquitto -c /mosquitto-no-auth.conf
     logging:
       driver: "json-file"
       options:
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index 403ac4d66..eb713c8b4 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -47,7 +47,7 @@ public enum BackendConfig {
 
     config.register(BackendConfigKeys.JMS_HOST, "activemq", "Hostname for backend service for active mq");
     config.register(BackendConfigKeys.JMS_PORT, 61616, "Port for backend service for active mq");
-    config.register(BackendConfigKeys.MQTT_HOST, "activemq", "Hostname of mqtt service");
+    config.register(BackendConfigKeys.MQTT_HOST, "mosquitto", "Hostname of mqtt service");
     config.register(BackendConfigKeys.MQTT_PORT, 1883, "Port of mqtt service");
     config.register(BackendConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka");
     config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index 6b49a0e05..2c37e6f44 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -34,9 +34,6 @@ public class MessagingSettings {
     List<SpProtocol> protocolList;
     if (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL) != null) {
       switch (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL).toLowerCase()) {
-        case "mqtt":
-          protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
-          break;
         case "kafka":
           protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
           break;
@@ -44,10 +41,10 @@ public class MessagingSettings {
           protocolList = Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT);
           break;
         default:
-          protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+          protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
       }
     } else {
-      protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+      protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
     }
 
     return new MessagingSettings(
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 1b72bcb93..b5c68df45 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -312,15 +312,17 @@ public class DataLakeManagementV4 {
                 String q = "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" +measurementId + "\" WITH KEY = \"" +f + "\"";
                 Query query = new Query(q);
                 QueryResult queryResult = influxDB.query(query);
-                queryResult.getResults().forEach(res -> {
-                    res.getSeries().forEach(series -> {
-                        if (series.getValues().size() > 0) {
-                            String field = series.getValues().get(0).get(0).toString();
-                            List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
-                            tags.put(field, values);
-                        }
+                if (queryResult.getResults() != null) {
+                    queryResult.getResults().forEach(res -> {
+                        res.getSeries().forEach(series -> {
+                            if (series.getValues().size() > 0) {
+                                String field = series.getValues().get(0).get(0).toString();
+                                List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+                                tags.put(field, values);
+                            }
+                        });
                     });
-                });
+                }
         });
 
         return tags;
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
index 6ba637a9b..dcdfbe789 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -29,6 +29,8 @@ public class AbstractMqttConnector {
   protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
     this.mqtt = new MQTT();
     this.mqtt.setHost(makeBrokerUrl(protocolSettings));
+    this.mqtt.setConnectAttemptsMax(3);
+    this.mqtt.setReconnectDelay(1000);
     this.connection = mqtt.blockingConnection();
     this.connection.connect();
     this.connected = true;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index e7fac0ba0..d1247a15b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.manager.runtime;
 
+import org.apache.streampipes.commons.constants.Envs;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
@@ -27,8 +28,6 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -36,8 +35,6 @@ import java.util.Map;
 public enum PipelineElementRuntimeInfoFetcher {
   INSTANCE;
 
-  Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
-
   private final int FETCH_INTERVAL_MS = 300;
   private final Map<String, SpDataFormatConverter> converterMap;
 
@@ -49,8 +46,7 @@ public enum PipelineElementRuntimeInfoFetcher {
 
     if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) {
       return getLatestEventFromKafka(spDataStream);
-    }
-    else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol){
+    } else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol) {
       return getLatestEventFromJms(spDataStream);
     } else {
       return getLatestEventFromMqtt(spDataStream);
@@ -75,7 +71,7 @@ public enum PipelineElementRuntimeInfoFetcher {
     JmsTransportProtocol protocol = (JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change jms config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
     }
     if (!converterMap.containsKey(jmsTopic)) {
@@ -113,7 +109,7 @@ public enum PipelineElementRuntimeInfoFetcher {
     MqttTransportProtocol protocol = (MqttTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change mqtt config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
     }
 
@@ -122,15 +118,12 @@ public enum PipelineElementRuntimeInfoFetcher {
               new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
     }
     MqttConsumer mqttConsumer = new MqttConsumer();
-    mqttConsumer.connect(protocol, new InternalEventProcessor<byte[]>() {
-      @Override
-      public void onEvent(byte[] event) {
-        try {
-          result[0] = converterMap.get(mqttTopic).convert(event);
-          mqttConsumer.disconnect();
-        } catch (SpRuntimeException e) {
-          e.printStackTrace();
-        }
+    mqttConsumer.connect(protocol, event -> {
+      try {
+        result[0] = converterMap.get(mqttTopic).convert(event);
+        mqttConsumer.disconnect();
+      } catch (SpRuntimeException e) {
+        e.printStackTrace();
       }
     });
 
@@ -151,7 +144,7 @@ public enum PipelineElementRuntimeInfoFetcher {
     KafkaTransportProtocol protocol = (KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change kafka config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
       protocol.setKafkaPort(9094);
     }
@@ -161,14 +154,11 @@ public enum PipelineElementRuntimeInfoFetcher {
               new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
     }
 
-    SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, new InternalEventProcessor<byte[]>() {
-      @Override
-      public void onEvent(byte[] event) {
-        try {
-          result[0] = converterMap.get(kafkaTopic).convert(event);
-        } catch (SpRuntimeException e) {
-          e.printStackTrace();
-        }
+    SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, event -> {
+      try {
+        result[0] = converterMap.get(kafkaTopic).convert(event);
+      } catch (SpRuntimeException e) {
+        e.printStackTrace();
       }
     });