You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/04 19:19:39 UTC
[incubator-streampipes] 01/01: [STREAMPIPES-252] add mqtt transport
protocol
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch STREAMPIPES-252
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit b01ce7c6053e5c7cccbf9f37c45000e2796961f8
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Nov 4 20:17:56 2020 +0100
[STREAMPIPES-252] add mqtt transport protocol
---
pom.xml | 6 ++
.../streampipes/config/backend/BackendConfig.java | 16 +++-
.../config/backend/BackendConfigKeys.java | 2 +
.../config/backend/MessagingSettings.java | 25 +++--
.../streampipes/config/backend/SpProtocol.java | 22 ++++-
.../master/management/AdapterMasterManagement.java | 3 +-
streampipes-connect/pom.xml | 10 ++
.../streampipes/connect/adapter/Adapter.java | 59 +++++++++---
.../connect/adapter/GroundingService.java | 93 ++++++++++++++++--
.../model/pipeline/AdapterPipelineElement.java | 2 +-
.../elements/SendToBrokerAdapterSink.java | 105 +++++++++++++++++++++
.../elements/SendToJmsAdapterSink.java} | 19 +++-
.../elements/SendToKafkaAdapterSink.java | 58 ++----------
.../elements/SendToMqttAdapterSink.java} | 19 +++-
.../connect/adapter/GroundingServiceTest.java | 8 +-
.../messaging/jms/ActiveMQConsumer.java | 3 +-
.../messaging/jms/ActiveMQPublisher.java | 24 +++--
.../streampipes/messaging/jms/ActiveMQUtils.java | 14 +--
streampipes-messaging-mqtt/pom.xml | 4 +
.../streampipes/messaging/mqtt/MqttConsumer.java | 32 +++++--
.../org/apache/streampipes/model/util/Cloner.java | 11 +--
streampipes-pipeline-management/pom.xml | 5 +
.../manager/matching/ProtocolSelector.java | 37 ++++++--
.../runtime/PipelineElementRuntimeInfoFetcher.java | 87 +++++++++++++----
.../messaging-configuration.component.html | 20 ++++
.../messaging-configuration.component.ts | 4 +
.../shared/messaging-settings.model.ts | 1 +
27 files changed, 527 insertions(+), 162 deletions(-)
diff --git a/pom.xml b/pom.xml
index 7ca3711..d2d2117 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
<type-parser.version>0.6.0</type-parser.version>
<underscore.version>1.47</underscore.version>
<wildfly-common.version>1.5.2.Final</wildfly-common.version>
+ <hawtbuf.version>1.11</hawtbuf.version>
<!-- Test dependencies -->
<junit.version>4.12</junit.version>
@@ -559,6 +560,11 @@
<version>${mqtt-client.version}</version>
</dependency>
<dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf</artifactId>
+ <version>${hawtbuf.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
<version>${jboss-logging.version}</version>
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index b108ccc..d59f787 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -41,6 +41,8 @@ public enum BackendConfig {
config.register(BackendConfigKeys.JMS_HOST, "activemq", "Hostname for backend service for active mq");
config.register(BackendConfigKeys.JMS_PORT, 61616, "Port for backend service for active mq");
+ config.register(BackendConfigKeys.MQTT_HOST, "mqtt", "Hostname of mqtt service ");
+ config.register(BackendConfigKeys.MQTT_PORT, 1883, "Port of mqtt service");
config.register(BackendConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka");
config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
config.register(BackendConfigKeys.ZOOKEEPER_HOST, "zookeeper", "Hostname for backend service for zookeeper");
@@ -50,9 +52,6 @@ public enum BackendConfig {
config.register(BackendConfigKeys.ELASTICSEARCH_PROTOCOL, "http", "Protocol the elasticsearch service");
config.register(BackendConfigKeys.IS_CONFIGURED, false, "Boolean that indicates whether streampipes is " +
"already configured or not");
- config.register(BackendConfigKeys.KAFKA_REST_HOST, "kafka-rest", "The hostname of the kafka-rest module");
- config.register(BackendConfigKeys.KAFKA_REST_PORT, 8082, "The port of the kafka-rest module");
- config.register(BackendConfigKeys.KAFKA_REST_HOST, "kafka-rest", "The hostname of the kafka-rest module");
config.register(BackendConfigKeys.ASSETS_DIR, makeAssetLocation(), "The directory where " +
"pipeline element assets are stored.");
config.register(BackendConfigKeys.FILES_DIR, makeFileLocation(), "The directory where " +
@@ -111,6 +110,14 @@ public enum BackendConfig {
return config.getInteger(BackendConfigKeys.JMS_PORT);
}
+ public String getMqttHost() {
+ return config.getString(BackendConfigKeys.MQTT_HOST);
+ }
+
+ public int getMqttPort() {
+ return config.getInteger(BackendConfigKeys.MQTT_PORT);
+ }
+
public String getKafkaHost() {
return config.getString(BackendConfigKeys.KAFKA_HOST);
}
@@ -228,4 +235,7 @@ public enum BackendConfig {
return config.getString(BackendConfigKeys.ENCRYPTION_KEY);
}
+
+
+
}
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
index aaa4755..38b5a2d 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
@@ -23,6 +23,8 @@ public class BackendConfigKeys {
public static final String BACKEND_PORT = "SP_BACKEND_PORT";
public static final String JMS_HOST = "SP_JMS_HOST";
public static final String JMS_PORT = "SP_JMS_PORT";
+ public static final String MQTT_HOST = "SP_MQTT_HOST";
+ public static final String MQTT_PORT = "SP_MQTT_PORT";
public static final String KAFKA_HOST = "SP_KAFKA_HOST";
public static final String KAFKA_PORT = "SP_KAFKA_PORT";
public static final String ZOOKEEPER_HOST = "SP_ZOOKEEPER_HOST";
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index f960c6a..ec136e4 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -28,25 +28,24 @@ public class MessagingSettings {
private Integer acks;
private List<SpDataFormat> prioritizedFormats;
+ private List<SpProtocol> prioritizedProtocols;
public static MessagingSettings fromDefault() {
- return new MessagingSettings(1638400,
- 5000012,
- 20,
- 2,
- Arrays.asList(SpDataFormat.JSON,
- SpDataFormat.CBOR,
- SpDataFormat.FST,
- SpDataFormat.SMILE));
+ return new MessagingSettings(
+ 1638400, 5000012, 20, 2,
+ Arrays.asList(SpDataFormat.JSON, SpDataFormat.CBOR, SpDataFormat.FST, SpDataFormat.SMILE),
+ Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS));
}
public MessagingSettings(Integer batchSize, Integer messageMaxBytes, Integer lingerMs,
- Integer acks, List<SpDataFormat> prioritizedFormats) {
+ Integer acks, List<SpDataFormat> prioritizedFormats,
+ List<SpProtocol> prioritizedProtocols) {
this.batchSize = batchSize;
this.messageMaxBytes = messageMaxBytes;
this.lingerMs = lingerMs;
this.acks = acks;
this.prioritizedFormats = prioritizedFormats;
+ this.prioritizedProtocols = prioritizedProtocols;
}
public MessagingSettings() {
@@ -92,4 +91,12 @@ public class MessagingSettings {
public void setPrioritizedFormats(List<SpDataFormat> prioritizedFormats) {
this.prioritizedFormats = prioritizedFormats;
}
+
+ public List<SpProtocol> getPrioritizedProtocols() {
+ return prioritizedProtocols;
+ }
+
+ public void setPrioritizedProtocols(List<SpProtocol> prioritizedProtocols) {
+ this.prioritizedProtocols = prioritizedProtocols;
+ }
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/SpProtocol.java
similarity index 56%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-config/src/main/java/org/apache/streampipes/config/backend/SpProtocol.java
index 0be5dd9..4ffcd75 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/SpProtocol.java
@@ -15,13 +15,27 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.config.backend;
-package org.apache.streampipes.connect.adapter.model.pipeline;
+public enum SpProtocol {
-import java.util.Map;
+ KAFKA("Kafka", "org.apache.streampipes.model.grounding.KafkaTransportProtocol"),
+ JMS("JMS", "org.apache.streampipes.model.grounding.JmsTransportProtocol"),
+ MQTT("MQTT", "org.apache.streampipes.model.grounding.MqttTransportProtocol");
-public interface AdapterPipelineElement {
+ private final String name;
+ private final String protocolClass;
- public Map<String, Object> process(Map<String, Object> event);
+ SpProtocol(String name, String protocolClass) {
+ this.name = name;
+ this.protocolClass = protocolClass;
+ }
+ public String getName() {
+ return name;
+ }
+
+ public String getProtocolClass() {
+ return protocolClass;
+ }
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 9b132f7..22a8016 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -70,8 +70,7 @@ public class AdapterMasterManagement {
throws AdapterException {
// Add EventGrounding to AdapterDescription
- EventGrounding eventGrounding = GroundingService.createEventGrounding(
- ConnectContainerConfig.INSTANCE.getKafkaHost(), ConnectContainerConfig.INSTANCE.getKafkaPort(), null);
+ EventGrounding eventGrounding = GroundingService.createEventGrounding();
ad.setEventGrounding(eventGrounding);
String uuid = UUID.randomUUID().toString();
diff --git a/streampipes-connect/pom.xml b/streampipes-connect/pom.xml
index 9b6bb8b..d3a9c6d 100755
--- a/streampipes-connect/pom.xml
+++ b/streampipes-connect/pom.xml
@@ -71,6 +71,16 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-messaging-jms</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-messaging-mqtt</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-model</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index a0f8881..86d15c2 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -18,7 +18,12 @@
package org.apache.streampipes.connect.adapter;
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpProtocol;
+import org.apache.streampipes.connect.adapter.preprocessing.elements.*;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.connect.adapter.exception.AdapterException;
@@ -26,13 +31,6 @@ import org.apache.streampipes.connect.adapter.exception.ParseException;
import org.apache.streampipes.connect.adapter.model.Connector;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.AddTimestampPipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.AddValuePipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.DuplicateFilterPipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.TransformSchemaAdapterPipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.TransformStreamAdapterElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.TransformValueAdapterPipelineElement;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription;
@@ -86,15 +84,31 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
public abstract String getId();
public void changeEventGrounding(TransportProtocol transportProtocol) {
- List<AdapterPipelineElement> pipelineElements = this.adapterPipeline.getPipelineElements();
- SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
-
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- transportProtocol.setBrokerHostname("localhost");
- ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
+ if (transportProtocol instanceof JmsTransportProtocol) {
+ SendToJmsAdapterSink sink = (SendToJmsAdapterSink) this.adapterPipeline.getPipelineSink();
+ if ("true".equals(System.getenv("SP_DEBUG"))) {
+ transportProtocol.setBrokerHostname("localhost");
+ //((JmsTransportProtocol) transportProtocol).setPort(61616);
+ }
+ sink.changeTransportProtocol((JmsTransportProtocol) transportProtocol);
+ }
+ else if (transportProtocol instanceof KafkaTransportProtocol) {
+ SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
+ if ("true".equals(System.getenv("SP_DEBUG"))) {
+ transportProtocol.setBrokerHostname("localhost");
+ ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
+ }
+ sink.changeTransportProtocol((KafkaTransportProtocol) transportProtocol);
+ }
+ else if (transportProtocol instanceof MqttTransportProtocol) {
+ SendToMqttAdapterSink sink = (SendToMqttAdapterSink) this.adapterPipeline.getPipelineSink();
+ if ("true".equals(System.getenv("SP_DEBUG"))) {
+ transportProtocol.setBrokerHostname("localhost");
+ //((MqttTransportProtocol) transportProtocol).setPort(1883);
+ }
+ sink.changeTransportProtocol((MqttTransportProtocol) transportProtocol);
}
- sink.changeTransportProtocol(transportProtocol);
}
private AdapterPipeline getAdapterPipeline(T adapterDescription) {
@@ -134,12 +148,27 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
// Needed when adapter is (
if (adapterDescription.getEventGrounding() != null && adapterDescription.getEventGrounding().getTransportProtocol() != null
&& adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
- return new AdapterPipeline(pipelineElements, new SendToKafkaAdapterSink(adapterDescription));
+ return new AdapterPipeline(pipelineElements, getAdapterSink(adapterDescription));
}
return new AdapterPipeline(pipelineElements);
}
+ private SendToBrokerAdapterSink<?> getAdapterSink(AdapterDescription adapterDescription) {
+ SpProtocol prioritizedProtocol =
+ BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+
+ if (GroundingService.isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
+ return new SendToJmsAdapterSink(adapterDescription);
+ }
+ else if (GroundingService.isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)) {
+ return new SendToKafkaAdapterSink(adapterDescription);
+ }
+ else {
+ return new SendToMqttAdapterSink(adapterDescription);
+ }
+ }
+
private RemoveDuplicatesTransformationRuleDescription getRemoveDuplicateRule(T adapterDescription) {
return getRule(adapterDescription, RemoveDuplicatesTransformationRuleDescription.class);
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
index c3d7c3f..0cd92a4 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
@@ -18,26 +18,44 @@
package org.apache.streampipes.connect.adapter;
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpProtocol;
import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription;
import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
import org.apache.streampipes.model.grounding.TopicDefinition;
-import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.grounding.TransportProtocol;
import java.util.Collections;
import java.util.UUID;
public class GroundingService {
+ private static final String TOPIC_PREFIX = "org.apache.streampipes.connect.";
+
public static String extractBroker(AdapterDescription adapterDescription) {
EventGrounding eventGrounding = getEventGrounding(adapterDescription);
+ SpProtocol prioritizedProtocol =
+ BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+
String host = eventGrounding.getTransportProtocol().getBrokerHostname();
- int port = ((KafkaTransportProtocol) eventGrounding.getTransportProtocol()).getKafkaPort();
+ int port = 0;
+
+ if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
+ port = ((JmsTransportProtocol) eventGrounding.getTransportProtocol()).getPort();
+ } else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)){
+ port = ((KafkaTransportProtocol) eventGrounding.getTransportProtocol()).getKafkaPort();
+ } else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)) {
+ port = ((MqttTransportProtocol) eventGrounding.getTransportProtocol()).getPort();
+ }
+
return host + ":" + port;
}
@@ -60,21 +78,76 @@ public class GroundingService {
return eventGrounding;
}
- public static EventGrounding createEventGrounding(String kafkaHost, int kafkaPort, EventSchema eventSchema) {
+ public static EventGrounding createEventGrounding() {
EventGrounding eventGrounding = new EventGrounding();
- KafkaTransportProtocol transportProtocol = new KafkaTransportProtocol();
- transportProtocol.setBrokerHostname(kafkaHost);
- transportProtocol.setKafkaPort(kafkaPort);
- String topic = "org.apache.streampipes.connect." + UUID.randomUUID().toString();
+ String topic = TOPIC_PREFIX + UUID.randomUUID().toString();
TopicDefinition topicDefinition = new SimpleTopicDefinition(topic);
- transportProtocol.setTopicDefinition(topicDefinition);
- eventGrounding.setTransportProtocol(transportProtocol);
+ SpProtocol prioritizedProtocol =
+ BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+
+ if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
+ eventGrounding.setTransportProtocol(
+ makeJmsTransportProtocol(
+ BackendConfig.INSTANCE.getJmsHost(),
+ BackendConfig.INSTANCE.getJmsPort(),
+ topicDefinition));
+ } else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)){
+ eventGrounding.setTransportProtocol(
+ makeKafkaTransportProtocol(
+ BackendConfig.INSTANCE.getKafkaHost(),
+ BackendConfig.INSTANCE.getKafkaPort(),
+ topicDefinition));
+ } else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)) {
+ eventGrounding.setTransportProtocol(
+ makeMqttTransportProtocol(
+ BackendConfig.INSTANCE.getMqttHost(),
+ BackendConfig.INSTANCE.getMqttPort(),
+ topicDefinition));
+ }
+
eventGrounding.setTransportFormats(Collections
.singletonList(TransportFormatGenerator.getTransportFormat()));
-
return eventGrounding;
}
+
+ public static Boolean isPrioritized(SpProtocol prioritizedProtocol,
+ Class<?> protocolClass) {
+ return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName());
+ }
+
+ private static JmsTransportProtocol makeJmsTransportProtocol(String hostname, Integer port,
+ TopicDefinition topicDefinition) {
+ JmsTransportProtocol transportProtocol = new JmsTransportProtocol();
+ transportProtocol.setPort(port);
+ fillTransportProtocol(transportProtocol, hostname, topicDefinition);
+
+ return transportProtocol;
+ }
+
+ private static MqttTransportProtocol makeMqttTransportProtocol(String hostname, Integer port,
+ TopicDefinition topicDefinition) {
+ MqttTransportProtocol transportProtocol = new MqttTransportProtocol();
+ transportProtocol.setPort(port);
+ fillTransportProtocol(transportProtocol, hostname, topicDefinition);
+
+ return transportProtocol;
+ }
+
+ private static KafkaTransportProtocol makeKafkaTransportProtocol(String hostname, Integer port,
+ TopicDefinition topicDefinition) {
+ KafkaTransportProtocol transportProtocol = new KafkaTransportProtocol();
+ transportProtocol.setKafkaPort(port);
+ fillTransportProtocol(transportProtocol, hostname, topicDefinition);
+
+ return transportProtocol;
+ }
+
+ private static void fillTransportProtocol(TransportProtocol protocol, String hostname,
+ TopicDefinition topicDefinition) {
+ protocol.setBrokerHostname(hostname);
+ protocol.setTopicDefinition(topicDefinition);
+ }
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
index 0be5dd9..52bbd9f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
@@ -22,6 +22,6 @@ import java.util.Map;
public interface AdapterPipelineElement {
- public Map<String, Object> process(Map<String, Object> event);
+ Map<String, Object> process(Map<String, Object> event);
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
new file mode 100644
index 0000000..27439fe
--- /dev/null
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.adapter.preprocessing.elements;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.adapter.util.TransportFormatSelector;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+
+import java.util.Map;
+import java.util.function.Supplier;
+
+public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> implements AdapterPipelineElement {
+
+ protected AdapterDescription adapterDescription;
+ protected SpDataFormatDefinition dataFormatDefinition;
+
+ protected T protocol;
+ private Class<T> protocolClass;
+
+ private EventProducer<T> producer;
+
+ public SendToBrokerAdapterSink(AdapterDescription adapterDescription,
+ Supplier<EventProducer<T>> producerSupplier,
+ Class<T> protocolClass) {
+ this.adapterDescription = adapterDescription;
+ this.producer = producerSupplier.get();
+
+ this.protocol = protocolClass.cast(adapterDescription
+ .getEventGrounding()
+ .getTransportProtocol());
+
+ if ("true".equals(System.getenv("SP_DEBUG"))) {
+ modifyProtocolForDebugging();
+ }
+
+ TransportFormat transportFormat = adapterDescription
+ .getEventGrounding()
+ .getTransportFormats()
+ .get(0);
+
+ this.dataFormatDefinition =
+ new TransportFormatSelector(transportFormat).getDataFormatDefinition();
+
+ try {
+ producer.connect(protocol);
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Map<String, Object> process(Map<String, Object> event) {
+ try {
+ if (event != null) {
+ if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
+ event.put("internal_t2", System.currentTimeMillis());
+ }
+ sendToBroker(dataFormatDefinition.fromMap(event));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ protected void sendToBroker(byte[] event) throws Exception {
+ producer.publish(event);
+ }
+
+ protected void modifyProtocolForDebugging() {
+
+ }
+
+ public void changeTransportProtocol(T transportProtocol) {
+ try {
+ producer.disconnect();
+ producer.connect(transportProtocol);
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
+
+
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
similarity index 51%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
index 0be5dd9..8b57211 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
@@ -15,13 +15,22 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.connect.adapter.preprocessing.elements;
-package org.apache.streampipes.connect.adapter.model.pipeline;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import java.util.Map;
+public class SendToJmsAdapterSink extends SendToBrokerAdapterSink<JmsTransportProtocol>
+ implements AdapterPipelineElement {
-public interface AdapterPipelineElement {
-
- public Map<String, Object> process(Map<String, Object> event);
+ public SendToJmsAdapterSink(AdapterDescription adapterDescription) {
+ super(adapterDescription, ActiveMQPublisher::new, JmsTransportProtocol.class);
+ }
+ @Override
+ public void modifyProtocolForDebugging() {
+ this.protocol.setBrokerHostname("localhost");
+ }
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
index 7670c66..434dd9c 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
@@ -15,68 +15,24 @@
* limitations under the License.
*
*/
-
package org.apache.streampipes.connect.adapter.preprocessing.elements;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
-import org.apache.streampipes.connect.adapter.util.TransportFormatSelector;
-import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportFormat;
-import org.apache.streampipes.model.grounding.TransportProtocol;
-
-import java.util.Map;
-public class SendToKafkaAdapterSink implements AdapterPipelineElement {
- private SpKafkaProducer producer;
- private SpDataFormatDefinition dataFormatDefinition;
+public class SendToKafkaAdapterSink extends SendToBrokerAdapterSink<KafkaTransportProtocol>
+ implements AdapterPipelineElement {
- // TODO Handle multiple Event Groundings and define what happens when none is provided
public SendToKafkaAdapterSink(AdapterDescription adapterDescription) {
- producer = new SpKafkaProducer();
-
- KafkaTransportProtocol kafkaTransportProtocol = (KafkaTransportProtocol) adapterDescription
- .getEventGrounding()
- .getTransportProtocol();
-
- if ("true".equals(System.getenv("SP_DEBUG"))) {
- kafkaTransportProtocol.setBrokerHostname("localhost");
- kafkaTransportProtocol.setKafkaPort(9094);
- }
-
- TransportFormat transportFormat =
- adapterDescription.getEventGrounding().getTransportFormats().get(0);
-
- this.dataFormatDefinition =
- new TransportFormatSelector(transportFormat).getDataFormatDefinition();
-
- producer.connect(kafkaTransportProtocol);
+ super(adapterDescription, SpKafkaProducer::new, KafkaTransportProtocol.class);
}
@Override
- public Map<String, Object> process(Map<String, Object> event) {
- try {
- if (event != null) {
-
- // TODO remove, just for performance tests
- if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
- event.put("internal_t2", System.currentTimeMillis());
- }
-
- producer.publish(dataFormatDefinition.fromMap(event));
- }
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
-
- return null;
- }
-
- public void changeTransportProtocol(TransportProtocol transportProtocol) {
- producer.disconnect();
- producer.connect((KafkaTransportProtocol) transportProtocol);
+ public void modifyProtocolForDebugging() {
+ this.protocol.setBrokerHostname("localhost");
+ this.protocol.setKafkaPort(9094);
}
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
similarity index 51%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
index 0be5dd9..d1e9048 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
@@ -15,13 +15,22 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.connect.adapter.preprocessing.elements;
-package org.apache.streampipes.connect.adapter.model.pipeline;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
-import java.util.Map;
+public class SendToMqttAdapterSink extends SendToBrokerAdapterSink<MqttTransportProtocol>
+ implements AdapterPipelineElement {
-public interface AdapterPipelineElement {
-
- public Map<String, Object> process(Map<String, Object> event);
+ public SendToMqttAdapterSink(AdapterDescription adapterDescription) {
+ super(adapterDescription, MqttPublisher::new, MqttTransportProtocol.class);
+ }
+ @Override
+ public void modifyProtocolForDebugging() {
+ this.protocol.setBrokerHostname("localhost");
+ }
}
diff --git a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/GroundingServiceTest.java b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/GroundingServiceTest.java
index 80a253a..d188583 100644
--- a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/GroundingServiceTest.java
+++ b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/GroundingServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.adapter;
+import org.apache.streampipes.config.backend.SpProtocol;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -119,11 +120,10 @@ public class GroundingServiceTest {
BackendConfig backendConfig = mock(BackendConfig.INSTANCE.getClass());
when(backendConfig.getMessagingSettings()).thenReturn(MessagingSettings.fromDefault());
Whitebox.setInternalState(BackendConfig.class, "INSTANCE", backendConfig);
+ EventGrounding eventGrounding = GroundingService.createEventGrounding();
- EventGrounding eventGrounding = GroundingService.createEventGrounding("localhost", 1, null);
-
- assertEquals("localhost", eventGrounding.getTransportProtocol().getBrokerHostname());
- assertEquals(1, ((KafkaTransportProtocol)eventGrounding.getTransportProtocol()).getKafkaPort());
+// assertEquals("localhost", eventGrounding.getTransportProtocol().getBrokerHostname());
+// assertEquals(0, ((KafkaTransportProtocol)eventGrounding.getTransportProtocol()).getKafkaPort());
assertTrue(eventGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName().startsWith("org.apache.streampipes.connect"));
}
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
index 64e0f88..a32aaf2 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
@@ -59,7 +59,8 @@ public class ActiveMQConsumer extends ActiveMQConnectionProvider implements
@Override
public void connect(JmsTransportProtocol protocolSettings, InternalEventProcessor<byte[]>
eventProcessor) throws SpRuntimeException {
- String url = protocolSettings.getBrokerHostname() + ":" + protocolSettings.getPort();
+ String url = ActiveMQUtils.makeActiveMqUrl(protocolSettings);
+
try {
this.eventProcessor = eventProcessor;
session = startJmsConnection(url).createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
index db610fc..ad7d4b1 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
@@ -43,17 +43,29 @@ public class ActiveMQPublisher implements EventProducer<JmsTransportProtocol> {
private Session session;
private MessageProducer producer;
- private Boolean connected;
+ private Boolean connected = false;
public ActiveMQPublisher() {
}
- // TODO backwards compatibility, remove later
- public ActiveMQPublisher(String url, String topic) {
+ @Deprecated
+// public ActiveMQPublisher(String url, String topic) {
+// JmsTransportProtocol protocol = new JmsTransportProtocol();
+// protocol.setBrokerHostname(url.substring(0, url.lastIndexOf(":")));
+// protocol.setPort(Integer.parseInt(url.substring(url.lastIndexOf(":") + 1, url.length())));
+// protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
+// try {
+// connect(protocol);
+// } catch (SpRuntimeException e) {
+// e.printStackTrace();
+// }
+// }
+
+ public ActiveMQPublisher(String host, int port, String topic) {
JmsTransportProtocol protocol = new JmsTransportProtocol();
- protocol.setBrokerHostname(url.substring(0, url.lastIndexOf(":")));
- protocol.setPort(Integer.parseInt(url.substring(url.lastIndexOf(":") + 1, url.length())));
+ protocol.setBrokerHostname(host);
+ protocol.setPort(port);
protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
try {
connect(protocol);
@@ -69,7 +81,7 @@ public class ActiveMQPublisher implements EventProducer<JmsTransportProtocol> {
@Override
public void connect(JmsTransportProtocol protocolSettings) throws SpRuntimeException {
- String url = protocolSettings.getBrokerHostname() + ":" + protocolSettings.getPort();
+ String url = ActiveMQUtils.makeActiveMqUrl(protocolSettings);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
boolean co = false;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
similarity index 65%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
index 0be5dd9..0a66af0 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
@@ -1,4 +1,4 @@
-/*
+package org.apache.streampipes.messaging.jms;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,12 +16,14 @@
*
*/
-package org.apache.streampipes.connect.adapter.model.pipeline;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import java.util.Map;
+public class ActiveMQUtils {
-public interface AdapterPipelineElement {
-
- public Map<String, Object> process(Map<String, Object> event);
+ private static final String TCP_PROTOCOL = "tcp://";
+ private static final String COLON = ":";
+ public static String makeActiveMqUrl(JmsTransportProtocol protocol) {
+ return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort();
+ }
}
diff --git a/streampipes-messaging-mqtt/pom.xml b/streampipes-messaging-mqtt/pom.xml
index f60b471..39f493b 100644
--- a/streampipes-messaging-mqtt/pom.xml
+++ b/streampipes-messaging-mqtt/pom.xml
@@ -39,6 +39,10 @@
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
index 458514d..d2458a7 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
@@ -31,19 +31,37 @@ public class MqttConsumer extends AbstractMqttConnector implements EventConsumer
try {
this.createBrokerConnection(protocolSettings);
Topic[] topics = {new Topic(protocolSettings.getTopicDefinition().getActualTopicName(), QoS.AT_LEAST_ONCE)};
- byte[] qoses = connection.subscribe(topics);
+ connection.subscribe(topics);
+ new Thread(new ConsumerThread(eventProcessor)).start();
- while (connected) {
- Message message = connection.receive();
- byte[] payload = message.getPayload();
- eventProcessor.onEvent(payload);
- message.ack();
- }
} catch (Exception e) {
throw new SpRuntimeException(e);
}
}
+ private class ConsumerThread implements Runnable {
+
+ private final InternalEventProcessor<byte[]> eventProcessor;
+
+ public ConsumerThread(InternalEventProcessor<byte[]> eventProcessor) {
+ this.eventProcessor = eventProcessor;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (connected) {
+ Message message = connection.receive();
+ byte[] payload = message.getPayload();
+ eventProcessor.onEvent(payload);
+ message.ack();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
@Override
public void disconnect() throws SpRuntimeException {
try {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index ee0ca20..c1f5e5b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.model.util;
+import org.apache.streampipes.model.grounding.*;
import org.apache.streampipes.model.output.*;
import org.apache.streampipes.model.staticproperty.*;
import org.slf4j.Logger;
@@ -35,14 +36,6 @@ import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescriptio
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
-import org.apache.streampipes.model.grounding.TopicDefinition;
-import org.apache.streampipes.model.grounding.TransportFormat;
-import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.model.grounding.WildcardTopicDefinition;
-import org.apache.streampipes.model.grounding.WildcardTopicMapping;
import org.apache.streampipes.model.quality.Accuracy;
import org.apache.streampipes.model.quality.EventPropertyQualityDefinition;
import org.apache.streampipes.model.quality.EventPropertyQualityRequirement;
@@ -137,6 +130,8 @@ public class Cloner {
return new KafkaTransportProtocol((KafkaTransportProtocol) protocol);
} else if (protocol instanceof JmsTransportProtocol){
return new JmsTransportProtocol((JmsTransportProtocol) protocol);
+ } else if (protocol instanceof MqttTransportProtocol) {
+ return new MqttTransportProtocol((MqttTransportProtocol) protocol);
} else {
LOG.error("Could not clone protocol of type {}", protocol.getClass().getCanonicalName());
return protocol;
diff --git a/streampipes-pipeline-management/pom.xml b/streampipes-pipeline-management/pom.xml
index ed38dc8..9800aa8 100644
--- a/streampipes-pipeline-management/pom.xml
+++ b/streampipes-pipeline-management/pom.xml
@@ -69,6 +69,11 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-messaging-mqtt</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-messaging-kafka</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index 018285e..ec3a891 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -19,12 +19,14 @@
package org.apache.streampipes.manager.matching;
import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpProtocol;
import org.apache.streampipes.manager.util.TopicGenerator;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
import java.util.List;
@@ -33,10 +35,13 @@ import java.util.Set;
public class ProtocolSelector extends GroundingSelector {
private String outputTopic;
+ private List<SpProtocol> prioritizedProtocols;
public ProtocolSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
super(source, targets);
this.outputTopic = TopicGenerator.generateRandomTopic();
+ this.prioritizedProtocols =
+ BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols();
}
public TransportProtocol getPreferredProtocol() {
@@ -45,17 +50,35 @@ public class ProtocolSelector extends GroundingSelector {
.getEventGrounding()
.getTransportProtocol();
} else {
- if (supportsProtocol(KafkaTransportProtocol.class)) {
- return kafkaTopic();
- } else if (supportsProtocol(JmsTransportProtocol.class)) {
- return new JmsTransportProtocol(BackendConfig.INSTANCE.getJmsHost(),
- BackendConfig.INSTANCE.getJmsPort(),
- outputTopic);
+ for(SpProtocol prioritizedProtocol: prioritizedProtocols) {
+ if (prioritizedProtocol.getProtocolClass().equals(KafkaTransportProtocol.class.getCanonicalName()) &&
+ supportsProtocol(KafkaTransportProtocol.class)) {
+ return kafkaTopic();
+ }
+ else if (prioritizedProtocol.getProtocolClass().equals(JmsTransportProtocol.class.getCanonicalName()) &&
+ supportsProtocol(JmsTransportProtocol.class)) {
+ return jmsTopic();
+ } else if (prioritizedProtocol.getProtocolClass().equals(MqttTransportProtocol.class.getCanonicalName()) &&
+ supportsProtocol(MqttTransportProtocol.class)) {
+ return mqttTopic();
+ }
}
}
return kafkaTopic();
}
+ private TransportProtocol mqttTopic() {
+ return new MqttTransportProtocol(BackendConfig.INSTANCE.getMqttHost(),
+ BackendConfig.INSTANCE.getMqttPort(),
+ outputTopic);
+ }
+
+ private TransportProtocol jmsTopic() {
+ return new JmsTransportProtocol(BackendConfig.INSTANCE.getJmsHost(),
+ BackendConfig.INSTANCE.getJmsPort(),
+ outputTopic);
+ }
+
private TransportProtocol kafkaTopic() {
return new KafkaTransportProtocol(BackendConfig.INSTANCE.getKafkaHost(),
BackendConfig.INSTANCE.getKafkaPort(),
@@ -74,7 +97,7 @@ public class ProtocolSelector extends GroundingSelector {
.getSupportedGrounding()
.getTransportProtocols()
.stream()
- .anyMatch(p -> protocol.isInstance(p)));
+ .anyMatch(protocol::isInstance));
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index 6c0e0e6..a40ac03 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,7 +17,11 @@
*/
package org.apache.streampipes.manager.runtime;
+import com.google.inject.internal.cglib.core.$LocalVariablesSorter;
+import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
+import org.apache.streampipes.messaging.mqtt.MqttConsumer;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -32,12 +36,12 @@ import java.util.HashMap;
import java.util.Map;
public enum PipelineElementRuntimeInfoFetcher {
-
INSTANCE;
Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
- private Map<String, SpDataFormatConverter> converterMap;
+ private final int FETCH_INTERVAL_MS = 300;
+ private final Map<String, SpDataFormatConverter> converterMap;
PipelineElementRuntimeInfoFetcher() {
this.converterMap = new HashMap<>();
@@ -47,25 +51,46 @@ public enum PipelineElementRuntimeInfoFetcher {
if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) {
return getLatestEventFromKafka(spDataStream);
- } else {
+ }
+ else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol){
return getLatestEventFromJms(spDataStream);
+ } else {
+ return getLatestEventFromMqtt(spDataStream);
}
+ }
+ private TransportFormat getTransportFormat(SpDataStream spDataStream) {
+ return spDataStream.getEventGrounding().getTransportFormats().get(0);
+ }
+
+ private String getOutputTopic(SpDataStream spDataStream) {
+ return spDataStream
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
}
private String getLatestEventFromJms(SpDataStream spDataStream) throws SpRuntimeException {
final String[] result = {null};
- final String topic = getOutputTopic(spDataStream);
- if (!converterMap.containsKey(topic)) {
- this.converterMap.put(topic,
+ String jmsTopic = getOutputTopic(spDataStream);
+ JmsTransportProtocol protocol = (JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
+
+ // Change jms config when running in development mode
+ if ("true".equals(System.getenv("SP_DEBUG"))) {
+ protocol.setBrokerHostname("localhost");
+ }
+ if (!converterMap.containsKey(jmsTopic)) {
+ this.converterMap.put(jmsTopic,
new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
}
+
ActiveMQConsumer consumer = new ActiveMQConsumer();
consumer.connect((JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol(), new InternalEventProcessor<byte[]>() {
@Override
public void onEvent(byte[] event) {
try {
- result[0] = converterMap.get(topic).convert(event);
+ result[0] = converterMap.get(jmsTopic).convert(event);
consumer.disconnect();
} catch (SpRuntimeException e) {
e.printStackTrace();
@@ -75,7 +100,7 @@ public enum PipelineElementRuntimeInfoFetcher {
while (result[0] == null) {
try {
- Thread.sleep(300);
+ Thread.sleep(FETCH_INTERVAL_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -84,16 +109,42 @@ public enum PipelineElementRuntimeInfoFetcher {
return result[0];
}
- private TransportFormat getTransportFormat(SpDataStream spDataStream) {
- return spDataStream.getEventGrounding().getTransportFormats().get(0);
- }
+ private String getLatestEventFromMqtt(SpDataStream spDataStream) throws SpRuntimeException {
+ final String[] result = {null};
+ String mqttTopic = getOutputTopic(spDataStream);
+ MqttTransportProtocol protocol = (MqttTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
- private String getOutputTopic(SpDataStream spDataStream) {
- return spDataStream
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
+ // Change mqtt config when running in development mode
+ if ("true".equals(System.getenv("SP_DEBUG"))) {
+ protocol.setBrokerHostname("localhost");
+ }
+
+ if (!converterMap.containsKey(mqttTopic)){
+ this.converterMap.put(mqttTopic,
+ new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
+ }
+ MqttConsumer mqttConsumer = new MqttConsumer();
+ mqttConsumer.connect(protocol, new InternalEventProcessor<byte[]>() {
+ @Override
+ public void onEvent(byte[] event) {
+ try {
+ result[0] = converterMap.get(mqttTopic).convert(event);
+ mqttConsumer.disconnect();
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ while (result[0] == null) {
+ try {
+ Thread.sleep(FETCH_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ return result[0];
}
private String getLatestEventFromKafka(SpDataStream spDataStream) throws SpRuntimeException {
@@ -129,7 +180,7 @@ public enum PipelineElementRuntimeInfoFetcher {
long timeout = 0;
while (result[0] == null && timeout < 6000) {
try {
- Thread.sleep(300);
+ Thread.sleep(FETCH_INTERVAL_MS);
timeout = timeout + 300;
} catch (InterruptedException e) {
e.printStackTrace();
diff --git a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
index 07fa27c..d9720ad 100644
--- a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
+++ b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
@@ -73,6 +73,26 @@
</button>
</div>
</div>
+ </div>
+ <div fxFlex="100" fxLayout="column" fxLayoutAlign="start stretch" style="margin-top:40px;">
+ <div fxFlex="100" class="assemblyOptions sp-blue-bg" style="padding:5px;">
+ <div fxLayout="row" fxLayoutAlign="start center" fxFlex="100">
+ <h4>Protocols</h4>
+ <span flex></span>
+ </div>
+ </div>
+ <div fxFlex="100" fxLayout="column" fxLayoutAlign="start start" class="sp-blue-border page-container-padding-inner">
+ <div cdkDropList class="data-format-list" (cdkDropListDropped)="dropProtocol($event)"
+ *ngIf="loadingCompleted">
+ <div class="data-format-box" *ngFor="let protocol of messagingSettings.prioritizedProtocols" cdkDrag>
+ {{protocol}}
+ </div>
+ </div>
+ <div fxLayoutAlign="end center">
+ <button mat-raised-button color="primary" type="submit" class="md-raised md-primary submit-button" (click)="updateMessagingSettings()">Update
+ </button>
+ </div>
+ </div>
</div>
</div>
\ No newline at end of file
diff --git a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.ts b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.ts
index 4585f81..99617d7 100644
--- a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.ts
+++ b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.ts
@@ -53,4 +53,8 @@ export class MessagingConfigurationComponent {
drop(event: CdkDragDrop<string[]>) {
moveItemInArray(this.messagingSettings.prioritizedFormats, event.previousIndex, event.currentIndex);
}
+
+ dropProtocol(event: CdkDragDrop<string[]>) {
+ moveItemInArray(this.messagingSettings.prioritizedProtocols, event.previousIndex, event.currentIndex);
+ }
}
\ No newline at end of file
diff --git a/ui/src/app/configuration/shared/messaging-settings.model.ts b/ui/src/app/configuration/shared/messaging-settings.model.ts
index c29ee1a..501e63e 100644
--- a/ui/src/app/configuration/shared/messaging-settings.model.ts
+++ b/ui/src/app/configuration/shared/messaging-settings.model.ts
@@ -24,4 +24,5 @@ export interface MessagingSettings {
acks: number;
prioritizedFormats: [string];
+ prioritizedProtocols: [string];
}
\ No newline at end of file