You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/05/01 19:12:29 UTC
[incubator-streampipes] 01/01: [STREAMPIPES-527] Use MQTT as default protocol in config, fix bug in preview
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-527
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit e7b4e499ceecdabbf04503ed50135886d4b060cf
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun May 1 21:12:13 2022 +0200
[STREAMPIPES-527] Use MQTT as default protocol in config, fix bug in preview
---
.idea/runConfigurations/backend.xml | 8 ++---
.../standalone/backend/docker-compose.dev.yml | 2 ++
.../deploy/standalone/backend/docker-compose.yml | 2 +-
.../standalone/mosquitto/docker-compose.dev.yml | 2 +-
.../deploy/standalone/mosquitto/docker-compose.yml | 3 +-
.../streampipes/config/backend/BackendConfig.java | 2 +-
.../config/backend/MessagingSettings.java | 7 ++--
.../dataexplorer/DataLakeManagementV4.java | 18 +++++-----
.../messaging/mqtt/AbstractMqttConnector.java | 2 ++
.../runtime/PipelineElementRuntimeInfoFetcher.java | 42 +++++++++-------------
10 files changed, 41 insertions(+), 47 deletions(-)
diff --git a/.idea/runConfigurations/backend.xml b/.idea/runConfigurations/backend.xml
index ff3c77297..fb5908bab 100644
--- a/.idea/runConfigurations/backend.xml
+++ b/.idea/runConfigurations/backend.xml
@@ -2,7 +2,7 @@
<configuration default="false" name="backend" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot">
<module name="streampipes-backend" />
<option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.backend.StreamPipesBackendApplication" />
- <option name="ALTERNATIVE_JRE_PATH" />
+ <option name="ALTERNATIVE_JRE_PATH" value="11" />
<option name="SHORTEN_COMMAND_LINE" value="NONE" />
<envs>
<env name="SP_COUCHDB_HOST" value="localhost" />
@@ -12,11 +12,11 @@
<env name="SP_INFLUX_PORT" value="8086" />
<env name="SP_KAFKA_PORT" value="9094" />
<env name="SP_JMS_HOST" value="localhost" />
- <env name="SP_DEBUG" value="true " />
- <env name="SP_PRIORITIZED_PROTOCOL" value="kafka" />
+ <env name="SP_DEBUG" value="true" />
+ <env name="SP_PRIORITIZED_PROTOCOL" value="mqtt" />
</envs>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
-</component>
\ No newline at end of file
+</component>
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
index 787111886..c67c3a78a 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
@@ -18,3 +18,5 @@ services:
backend:
ports:
- "8030:8030"
+ environment:
+ - SP_HOST=host.docker.internal
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.yml b/installer/cli/deploy/standalone/backend/docker-compose.yml
index 4d014eb27..c67ba450e 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.yml
@@ -24,7 +24,7 @@ services:
- backend:/root/.streampipes
- files:/spImages
environment:
- - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-kafka}
+ - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-mqtt}
- SP_MQTT_HOST=${SP_MQTT_HOST:-activemq}
logging:
driver: "json-file"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
index ae347c1c3..711570b8b 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
@@ -17,4 +17,4 @@ version: "3.4"
services:
mosquitto:
ports:
- - "1884:1883"
+ - "1883:1883"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
index 5e5b07af2..c936edf17 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
@@ -16,7 +16,8 @@
version: "3.4"
services:
mosquitto:
- image: eclipse-mosquitto:1.5.4
+ image: eclipse-mosquitto:2.0.14
+ command: mosquitto -c /mosquitto-no-auth.conf
logging:
driver: "json-file"
options:
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index 403ac4d66..eb713c8b4 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -47,7 +47,7 @@ public enum BackendConfig {
config.register(BackendConfigKeys.JMS_HOST, "activemq", "Hostname for backend service for active mq");
config.register(BackendConfigKeys.JMS_PORT, 61616, "Port for backend service for active mq");
- config.register(BackendConfigKeys.MQTT_HOST, "activemq", "Hostname of mqtt service");
+ config.register(BackendConfigKeys.MQTT_HOST, "mosquitto", "Hostname of mqtt service");
config.register(BackendConfigKeys.MQTT_PORT, 1883, "Port of mqtt service");
config.register(BackendConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka");
config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index 6b49a0e05..2c37e6f44 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -34,9 +34,6 @@ public class MessagingSettings {
List<SpProtocol> protocolList;
if (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL) != null) {
switch (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL).toLowerCase()) {
- case "mqtt":
- protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
- break;
case "kafka":
protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
break;
@@ -44,10 +41,10 @@ public class MessagingSettings {
protocolList = Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT);
break;
default:
- protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+ protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
}
} else {
- protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+ protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
}
return new MessagingSettings(
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 1b72bcb93..b5c68df45 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -312,15 +312,17 @@ public class DataLakeManagementV4 {
String q = "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" +measurementId + "\" WITH KEY = \"" +f + "\"";
Query query = new Query(q);
QueryResult queryResult = influxDB.query(query);
- queryResult.getResults().forEach(res -> {
- res.getSeries().forEach(series -> {
- if (series.getValues().size() > 0) {
- String field = series.getValues().get(0).get(0).toString();
- List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
- tags.put(field, values);
- }
+ if (queryResult.getResults() != null) {
+ queryResult.getResults().forEach(res -> {
+ res.getSeries().forEach(series -> {
+ if (series.getValues().size() > 0) {
+ String field = series.getValues().get(0).get(0).toString();
+ List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+ tags.put(field, values);
+ }
+ });
});
- });
+ }
});
return tags;
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
index 6ba637a9b..dcdfbe789 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -29,6 +29,8 @@ public class AbstractMqttConnector {
protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
this.mqtt = new MQTT();
this.mqtt.setHost(makeBrokerUrl(protocolSettings));
+ this.mqtt.setConnectAttemptsMax(3);
+ this.mqtt.setReconnectDelay(1000);
this.connection = mqtt.blockingConnection();
this.connection.connect();
this.connected = true;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index e7fac0ba0..d1247a15b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.manager.runtime;
+import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
@@ -27,8 +28,6 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.TransportFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -36,8 +35,6 @@ import java.util.Map;
public enum PipelineElementRuntimeInfoFetcher {
INSTANCE;
- Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
-
private final int FETCH_INTERVAL_MS = 300;
private final Map<String, SpDataFormatConverter> converterMap;
@@ -49,8 +46,7 @@ public enum PipelineElementRuntimeInfoFetcher {
if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) {
return getLatestEventFromKafka(spDataStream);
- }
- else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol){
+ } else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol) {
return getLatestEventFromJms(spDataStream);
} else {
return getLatestEventFromMqtt(spDataStream);
@@ -75,7 +71,7 @@ public enum PipelineElementRuntimeInfoFetcher {
JmsTransportProtocol protocol = (JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
// Change jms config when running in development mode
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
protocol.setBrokerHostname("localhost");
}
if (!converterMap.containsKey(jmsTopic)) {
@@ -113,7 +109,7 @@ public enum PipelineElementRuntimeInfoFetcher {
MqttTransportProtocol protocol = (MqttTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
// Change mqtt config when running in development mode
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
protocol.setBrokerHostname("localhost");
}
@@ -122,15 +118,12 @@ public enum PipelineElementRuntimeInfoFetcher {
new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
}
MqttConsumer mqttConsumer = new MqttConsumer();
- mqttConsumer.connect(protocol, new InternalEventProcessor<byte[]>() {
- @Override
- public void onEvent(byte[] event) {
- try {
- result[0] = converterMap.get(mqttTopic).convert(event);
- mqttConsumer.disconnect();
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
+ mqttConsumer.connect(protocol, event -> {
+ try {
+ result[0] = converterMap.get(mqttTopic).convert(event);
+ mqttConsumer.disconnect();
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
}
});
@@ -151,7 +144,7 @@ public enum PipelineElementRuntimeInfoFetcher {
KafkaTransportProtocol protocol = (KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
// Change kafka config when running in development mode
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
protocol.setBrokerHostname("localhost");
protocol.setKafkaPort(9094);
}
@@ -161,14 +154,11 @@ public enum PipelineElementRuntimeInfoFetcher {
new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
}
- SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, new InternalEventProcessor<byte[]>() {
- @Override
- public void onEvent(byte[] event) {
- try {
- result[0] = converterMap.get(kafkaTopic).convert(event);
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
+ SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, event -> {
+ try {
+ result[0] = converterMap.get(kafkaTopic).convert(event);
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
}
});