You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/04 19:19:39 UTC

[incubator-streampipes] 01/01: [STREAMPIPES-252] add mqtt transport protocol

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch STREAMPIPES-252
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit b01ce7c6053e5c7cccbf9f37c45000e2796961f8
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Nov 4 20:17:56 2020 +0100

    [STREAMPIPES-252] add mqtt transport protocol
---
 pom.xml                                            |   6 ++
 .../streampipes/config/backend/BackendConfig.java  |  16 +++-
 .../config/backend/BackendConfigKeys.java          |   2 +
 .../config/backend/MessagingSettings.java          |  25 +++--
 .../streampipes/config/backend/SpProtocol.java     |  22 ++++-
 .../master/management/AdapterMasterManagement.java |   3 +-
 streampipes-connect/pom.xml                        |  10 ++
 .../streampipes/connect/adapter/Adapter.java       |  59 +++++++++---
 .../connect/adapter/GroundingService.java          |  93 ++++++++++++++++--
 .../model/pipeline/AdapterPipelineElement.java     |   2 +-
 .../elements/SendToBrokerAdapterSink.java          | 105 +++++++++++++++++++++
 .../elements/SendToJmsAdapterSink.java}            |  19 +++-
 .../elements/SendToKafkaAdapterSink.java           |  58 ++----------
 .../elements/SendToMqttAdapterSink.java}           |  19 +++-
 .../connect/adapter/GroundingServiceTest.java      |   8 +-
 .../messaging/jms/ActiveMQConsumer.java            |   3 +-
 .../messaging/jms/ActiveMQPublisher.java           |  24 +++--
 .../streampipes/messaging/jms/ActiveMQUtils.java   |  14 +--
 streampipes-messaging-mqtt/pom.xml                 |   4 +
 .../streampipes/messaging/mqtt/MqttConsumer.java   |  32 +++++--
 .../org/apache/streampipes/model/util/Cloner.java  |  11 +--
 streampipes-pipeline-management/pom.xml            |   5 +
 .../manager/matching/ProtocolSelector.java         |  37 ++++++--
 .../runtime/PipelineElementRuntimeInfoFetcher.java |  87 +++++++++++++----
 .../messaging-configuration.component.html         |  20 ++++
 .../messaging-configuration.component.ts           |   4 +
 .../shared/messaging-settings.model.ts             |   1 +
 27 files changed, 527 insertions(+), 162 deletions(-)

diff --git a/pom.xml b/pom.xml
index 7ca3711..d2d2117 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
         <type-parser.version>0.6.0</type-parser.version>
         <underscore.version>1.47</underscore.version>
         <wildfly-common.version>1.5.2.Final</wildfly-common.version>
+        <hawtbuf.version>1.11</hawtbuf.version>
 
         <!-- Test dependencies -->
         <junit.version>4.12</junit.version>
@@ -559,6 +560,11 @@
                 <version>${mqtt-client.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.fusesource.hawtbuf</groupId>
+                <artifactId>hawtbuf</artifactId>
+                <version>${hawtbuf.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.jboss.logging</groupId>
                 <artifactId>jboss-logging</artifactId>
                 <version>${jboss-logging.version}</version>
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index b108ccc..d59f787 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -41,6 +41,8 @@ public enum BackendConfig {
 
     config.register(BackendConfigKeys.JMS_HOST, "activemq", "Hostname for backend service for active mq");
     config.register(BackendConfigKeys.JMS_PORT, 61616, "Port for backend service for active mq");
+    config.register(BackendConfigKeys.MQTT_HOST, "mqtt", "Hostname of mqtt service ");
+    config.register(BackendConfigKeys.MQTT_PORT, 1883, "Port of mqtt service");
     config.register(BackendConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka");
     config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
     config.register(BackendConfigKeys.ZOOKEEPER_HOST, "zookeeper", "Hostname for backend service for zookeeper");
@@ -50,9 +52,6 @@ public enum BackendConfig {
     config.register(BackendConfigKeys.ELASTICSEARCH_PROTOCOL, "http", "Protocol the elasticsearch service");
     config.register(BackendConfigKeys.IS_CONFIGURED, false, "Boolean that indicates whether streampipes is " +
             "already configured or not");
-    config.register(BackendConfigKeys.KAFKA_REST_HOST, "kafka-rest", "The hostname of the kafka-rest module");
-    config.register(BackendConfigKeys.KAFKA_REST_PORT, 8082, "The port of the kafka-rest module");
-    config.register(BackendConfigKeys.KAFKA_REST_HOST, "kafka-rest", "The hostname of the kafka-rest module");
     config.register(BackendConfigKeys.ASSETS_DIR, makeAssetLocation(), "The directory where " +
             "pipeline element assets are stored.");
     config.register(BackendConfigKeys.FILES_DIR, makeFileLocation(), "The directory where " +
@@ -111,6 +110,14 @@ public enum BackendConfig {
     return config.getInteger(BackendConfigKeys.JMS_PORT);
   }
 
+  public String getMqttHost() {
+    return config.getString(BackendConfigKeys.MQTT_HOST);
+  }
+
+  public int getMqttPort() {
+    return config.getInteger(BackendConfigKeys.MQTT_PORT);
+  }
+
   public String getKafkaHost() {
     return config.getString(BackendConfigKeys.KAFKA_HOST);
   }
@@ -228,4 +235,7 @@ public enum BackendConfig {
     return config.getString(BackendConfigKeys.ENCRYPTION_KEY);
   }
 
+
+
+
 }
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
index aaa4755..38b5a2d 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
@@ -23,6 +23,8 @@ public class BackendConfigKeys {
   public static final String BACKEND_PORT = "SP_BACKEND_PORT";
   public static final String JMS_HOST = "SP_JMS_HOST";
   public static final String JMS_PORT = "SP_JMS_PORT";
+  public static final String MQTT_HOST = "SP_MQTT_HOST";
+  public static final String MQTT_PORT = "SP_MQTT_PORT";
   public static final String KAFKA_HOST = "SP_KAFKA_HOST";
   public static final String KAFKA_PORT = "SP_KAFKA_PORT";
   public static final String ZOOKEEPER_HOST = "SP_ZOOKEEPER_HOST";
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index f960c6a..ec136e4 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -28,25 +28,24 @@ public class MessagingSettings {
   private Integer acks;
 
   private List<SpDataFormat> prioritizedFormats;
+  private List<SpProtocol> prioritizedProtocols;
 
   public static MessagingSettings fromDefault() {
-    return new MessagingSettings(1638400,
-            5000012,
-            20,
-            2,
-            Arrays.asList(SpDataFormat.JSON,
-                    SpDataFormat.CBOR,
-            SpDataFormat.FST,
-            SpDataFormat.SMILE));
+    return new MessagingSettings(
+            1638400, 5000012, 20, 2,
+            Arrays.asList(SpDataFormat.JSON, SpDataFormat.CBOR, SpDataFormat.FST, SpDataFormat.SMILE),
+            Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS));
   }
 
   public MessagingSettings(Integer batchSize, Integer messageMaxBytes, Integer lingerMs,
-                           Integer acks, List<SpDataFormat> prioritizedFormats) {
+                           Integer acks, List<SpDataFormat> prioritizedFormats,
+                           List<SpProtocol> prioritizedProtocols) {
     this.batchSize = batchSize;
     this.messageMaxBytes = messageMaxBytes;
     this.lingerMs = lingerMs;
     this.acks = acks;
     this.prioritizedFormats = prioritizedFormats;
+    this.prioritizedProtocols = prioritizedProtocols;
   }
 
   public MessagingSettings() {
@@ -92,4 +91,12 @@ public class MessagingSettings {
   public void setPrioritizedFormats(List<SpDataFormat> prioritizedFormats) {
     this.prioritizedFormats = prioritizedFormats;
   }
+
+  public List<SpProtocol> getPrioritizedProtocols() {
+    return prioritizedProtocols;
+  }
+
+  public void setPrioritizedProtocols(List<SpProtocol> prioritizedProtocols) {
+    this.prioritizedProtocols = prioritizedProtocols;
+  }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/SpProtocol.java
similarity index 56%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-config/src/main/java/org/apache/streampipes/config/backend/SpProtocol.java
index 0be5dd9..4ffcd75 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/SpProtocol.java
@@ -15,13 +15,27 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.config.backend;
 
-package org.apache.streampipes.connect.adapter.model.pipeline;
+public enum SpProtocol {
 
-import java.util.Map;
+  KAFKA("Kafka", "org.apache.streampipes.model.grounding.KafkaTransportProtocol"),
+  JMS("JMS", "org.apache.streampipes.model.grounding.JmsTransportProtocol"),
+  MQTT("MQTT", "org.apache.streampipes.model.grounding.MqttTransportProtocol");
 
-public interface AdapterPipelineElement {
+  private final String name;
+  private final String protocolClass;
 
-    public Map<String, Object> process(Map<String, Object> event);
+  SpProtocol(String name, String protocolClass) {
+    this.name = name;
+    this.protocolClass = protocolClass;
+  }
 
+  public String getName() {
+    return name;
+  }
+
+  public String getProtocolClass() {
+    return protocolClass;
+  }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 9b132f7..22a8016 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -70,8 +70,7 @@ public class AdapterMasterManagement {
           throws AdapterException {
 
     // Add EventGrounding to AdapterDescription
-    EventGrounding eventGrounding = GroundingService.createEventGrounding(
-            ConnectContainerConfig.INSTANCE.getKafkaHost(), ConnectContainerConfig.INSTANCE.getKafkaPort(), null);
+    EventGrounding eventGrounding = GroundingService.createEventGrounding();
     ad.setEventGrounding(eventGrounding);
 
     String uuid = UUID.randomUUID().toString();
diff --git a/streampipes-connect/pom.xml b/streampipes-connect/pom.xml
index 9b6bb8b..d3a9c6d 100755
--- a/streampipes-connect/pom.xml
+++ b/streampipes-connect/pom.xml
@@ -71,6 +71,16 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-messaging-jms</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-messaging-mqtt</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-model</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index a0f8881..86d15c2 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -18,7 +18,12 @@
 
 package org.apache.streampipes.connect.adapter;
 
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpProtocol;
+import org.apache.streampipes.connect.adapter.preprocessing.elements.*;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
@@ -26,13 +31,6 @@ import org.apache.streampipes.connect.adapter.exception.ParseException;
 import org.apache.streampipes.connect.adapter.model.Connector;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.AddTimestampPipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.AddValuePipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.DuplicateFilterPipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.TransformSchemaAdapterPipelineElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.TransformStreamAdapterElement;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.TransformValueAdapterPipelineElement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription;
@@ -86,15 +84,31 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
     public abstract String getId();
 
     public void changeEventGrounding(TransportProtocol transportProtocol) {
-        List<AdapterPipelineElement> pipelineElements =  this.adapterPipeline.getPipelineElements();
-        SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
 
-
-        if ("true".equals(System.getenv("SP_DEBUG"))) {
-            transportProtocol.setBrokerHostname("localhost");
-            ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
+        if (transportProtocol instanceof JmsTransportProtocol) {
+            SendToJmsAdapterSink sink = (SendToJmsAdapterSink) this.adapterPipeline.getPipelineSink();
+            if ("true".equals(System.getenv("SP_DEBUG"))) {
+                transportProtocol.setBrokerHostname("localhost");
+                //((JmsTransportProtocol) transportProtocol).setPort(61616);
+            }
+            sink.changeTransportProtocol((JmsTransportProtocol) transportProtocol);
+        }
+        else if (transportProtocol instanceof KafkaTransportProtocol) {
+            SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
+            if ("true".equals(System.getenv("SP_DEBUG"))) {
+                transportProtocol.setBrokerHostname("localhost");
+                ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
+            }
+            sink.changeTransportProtocol((KafkaTransportProtocol) transportProtocol);
+        }
+        else if (transportProtocol instanceof MqttTransportProtocol) {
+            SendToMqttAdapterSink sink = (SendToMqttAdapterSink) this.adapterPipeline.getPipelineSink();
+            if ("true".equals(System.getenv("SP_DEBUG"))) {
+                transportProtocol.setBrokerHostname("localhost");
+                //((MqttTransportProtocol) transportProtocol).setPort(1883);
+            }
+            sink.changeTransportProtocol((MqttTransportProtocol) transportProtocol);
         }
-        sink.changeTransportProtocol(transportProtocol);
     }
 
     private AdapterPipeline getAdapterPipeline(T adapterDescription) {
@@ -134,12 +148,27 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
         // Needed when adapter is (
         if (adapterDescription.getEventGrounding() != null && adapterDescription.getEventGrounding().getTransportProtocol() != null
                 && adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
-            return new AdapterPipeline(pipelineElements, new SendToKafkaAdapterSink(adapterDescription));
+            return new AdapterPipeline(pipelineElements, getAdapterSink(adapterDescription));
         }
 
         return new AdapterPipeline(pipelineElements);
     }
 
+    private SendToBrokerAdapterSink<?> getAdapterSink(AdapterDescription adapterDescription) {
+        SpProtocol prioritizedProtocol =
+                BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+
+        if (GroundingService.isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
+            return new SendToJmsAdapterSink(adapterDescription);
+        }
+        else if (GroundingService.isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)) {
+            return new SendToKafkaAdapterSink(adapterDescription);
+        }
+        else {
+            return new SendToMqttAdapterSink(adapterDescription);
+        }
+    }
+
     private RemoveDuplicatesTransformationRuleDescription getRemoveDuplicateRule(T adapterDescription) {
         return getRule(adapterDescription, RemoveDuplicatesTransformationRuleDescription.class);
     }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
index c3d7c3f..0cd92a4 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
@@ -18,26 +18,44 @@
 
 package org.apache.streampipes.connect.adapter;
 
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription;
 import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
 import org.apache.streampipes.model.grounding.TopicDefinition;
-import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 
 import java.util.Collections;
 import java.util.UUID;
 
 public class GroundingService {
 
+    private static final String TOPIC_PREFIX = "org.apache.streampipes.connect.";
+
     public static String extractBroker(AdapterDescription adapterDescription) {
         EventGrounding eventGrounding = getEventGrounding(adapterDescription);
 
+        SpProtocol prioritizedProtocol =
+                BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+
         String host = eventGrounding.getTransportProtocol().getBrokerHostname();
-        int port = ((KafkaTransportProtocol) eventGrounding.getTransportProtocol()).getKafkaPort();
+        int port = 0;
+
+        if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
+            port = ((JmsTransportProtocol) eventGrounding.getTransportProtocol()).getPort();
+        } else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)){
+            port = ((KafkaTransportProtocol) eventGrounding.getTransportProtocol()).getKafkaPort();
+        } else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)) {
+            port = ((MqttTransportProtocol) eventGrounding.getTransportProtocol()).getPort();
+        }
+
         return host + ":" + port;
     }
 
@@ -60,21 +78,76 @@ public class GroundingService {
         return eventGrounding;
     }
 
-    public static EventGrounding createEventGrounding(String kafkaHost, int kafkaPort, EventSchema eventSchema) {
+    public static EventGrounding createEventGrounding() {
         EventGrounding eventGrounding = new EventGrounding();
-        KafkaTransportProtocol transportProtocol = new KafkaTransportProtocol();
-        transportProtocol.setBrokerHostname(kafkaHost);
-        transportProtocol.setKafkaPort(kafkaPort);
 
-        String topic = "org.apache.streampipes.connect." + UUID.randomUUID().toString();
+        String topic = TOPIC_PREFIX + UUID.randomUUID().toString();
         TopicDefinition topicDefinition = new SimpleTopicDefinition(topic);
-        transportProtocol.setTopicDefinition(topicDefinition);
 
-        eventGrounding.setTransportProtocol(transportProtocol);
+        SpProtocol prioritizedProtocol =
+                BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+
+        if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
+            eventGrounding.setTransportProtocol(
+                    makeJmsTransportProtocol(
+                            BackendConfig.INSTANCE.getJmsHost(),
+                            BackendConfig.INSTANCE.getJmsPort(),
+                            topicDefinition));
+        } else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)){
+            eventGrounding.setTransportProtocol(
+                    makeKafkaTransportProtocol(
+                            BackendConfig.INSTANCE.getKafkaHost(),
+                            BackendConfig.INSTANCE.getKafkaPort(),
+                            topicDefinition));
+        } else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)) {
+            eventGrounding.setTransportProtocol(
+                    makeMqttTransportProtocol(
+                            BackendConfig.INSTANCE.getMqttHost(),
+                            BackendConfig.INSTANCE.getMqttPort(),
+                            topicDefinition));
+        }
+
         eventGrounding.setTransportFormats(Collections
                 .singletonList(TransportFormatGenerator.getTransportFormat()));
 
-
         return eventGrounding;
     }
+
+    public static Boolean isPrioritized(SpProtocol prioritizedProtocol,
+                                         Class<?> protocolClass) {
+        return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName());
+    }
+
+    private static JmsTransportProtocol makeJmsTransportProtocol(String hostname, Integer port,
+                                                          TopicDefinition topicDefinition) {
+        JmsTransportProtocol transportProtocol = new JmsTransportProtocol();
+        transportProtocol.setPort(port);
+        fillTransportProtocol(transportProtocol, hostname, topicDefinition);
+
+        return transportProtocol;
+    }
+
+    private static MqttTransportProtocol makeMqttTransportProtocol(String hostname, Integer port,
+                                                                 TopicDefinition topicDefinition) {
+        MqttTransportProtocol transportProtocol = new MqttTransportProtocol();
+        transportProtocol.setPort(port);
+        fillTransportProtocol(transportProtocol, hostname, topicDefinition);
+
+        return transportProtocol;
+    }
+
+    private static KafkaTransportProtocol makeKafkaTransportProtocol(String hostname, Integer port,
+                                                              TopicDefinition topicDefinition) {
+        KafkaTransportProtocol transportProtocol = new KafkaTransportProtocol();
+        transportProtocol.setKafkaPort(port);
+        fillTransportProtocol(transportProtocol, hostname, topicDefinition);
+
+        return transportProtocol;
+    }
+
+    private static void fillTransportProtocol(TransportProtocol protocol, String hostname,
+                                       TopicDefinition topicDefinition) {
+        protocol.setBrokerHostname(hostname);
+        protocol.setTopicDefinition(topicDefinition);
+    }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
index 0be5dd9..52bbd9f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
@@ -22,6 +22,6 @@ import java.util.Map;
 
 public interface AdapterPipelineElement {
 
-    public Map<String, Object> process(Map<String, Object> event);
+    Map<String, Object> process(Map<String, Object> event);
 
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
new file mode 100644
index 0000000..27439fe
--- /dev/null
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.adapter.preprocessing.elements;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.connect.adapter.util.TransportFormatSelector;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.grounding.TransportFormat;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+
+import java.util.Map;
+import java.util.function.Supplier;
+
+public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> implements AdapterPipelineElement {
+
+  protected AdapterDescription adapterDescription;
+  protected SpDataFormatDefinition dataFormatDefinition;
+
+  protected T protocol;
+  private Class<T> protocolClass;
+
+  private EventProducer<T> producer;
+
+  public SendToBrokerAdapterSink(AdapterDescription adapterDescription,
+                                 Supplier<EventProducer<T>> producerSupplier,
+                                 Class<T> protocolClass) {
+    this.adapterDescription = adapterDescription;
+    this.producer = producerSupplier.get();
+
+    this.protocol = protocolClass.cast(adapterDescription
+            .getEventGrounding()
+            .getTransportProtocol());
+
+    if ("true".equals(System.getenv("SP_DEBUG"))) {
+      modifyProtocolForDebugging();
+    }
+
+    TransportFormat transportFormat = adapterDescription
+            .getEventGrounding()
+            .getTransportFormats()
+            .get(0);
+
+    this.dataFormatDefinition =
+            new TransportFormatSelector(transportFormat).getDataFormatDefinition();
+
+    try {
+      producer.connect(protocol);
+    } catch (SpRuntimeException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public Map<String, Object> process(Map<String, Object> event) {
+    try {
+      if (event != null) {
+        if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
+          event.put("internal_t2", System.currentTimeMillis());
+        }
+        sendToBroker(dataFormatDefinition.fromMap(event));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  protected void sendToBroker(byte[] event) throws Exception {
+    producer.publish(event);
+  }
+
+  protected void modifyProtocolForDebugging() {
+
+  }
+
+  public void changeTransportProtocol(T transportProtocol) {
+    try {
+      producer.disconnect();
+      producer.connect(transportProtocol);
+    } catch (SpRuntimeException e) {
+      e.printStackTrace();
+    }
+  }
+
+}
+
+
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
similarity index 51%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
index 0be5dd9..8b57211 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java
@@ -15,13 +15,22 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-package org.apache.streampipes.connect.adapter.model.pipeline;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 
-import java.util.Map;
+public class SendToJmsAdapterSink extends SendToBrokerAdapterSink<JmsTransportProtocol>
+        implements AdapterPipelineElement {
 
-public interface AdapterPipelineElement {
-
-    public Map<String, Object> process(Map<String, Object> event);
+    public SendToJmsAdapterSink(AdapterDescription adapterDescription) {
+        super(adapterDescription, ActiveMQPublisher::new, JmsTransportProtocol.class);
+    }
 
+    @Override
+    public void modifyProtocolForDebugging() {
+        this.protocol.setBrokerHostname("localhost");
+    }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
index 7670c66..434dd9c 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java
@@ -15,68 +15,24 @@
  * limitations under the License.
  *
  */
-
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
-import org.apache.streampipes.connect.adapter.util.TransportFormatSelector;
-import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportFormat;
-import org.apache.streampipes.model.grounding.TransportProtocol;
-
-import java.util.Map;
 
-public class SendToKafkaAdapterSink implements AdapterPipelineElement  {
-    private SpKafkaProducer producer;
-    private SpDataFormatDefinition dataFormatDefinition;
+public class SendToKafkaAdapterSink extends SendToBrokerAdapterSink<KafkaTransportProtocol>
+        implements AdapterPipelineElement  {
 
-    // TODO Handle multiple Event Groundings and define what happens when none is provided
     public SendToKafkaAdapterSink(AdapterDescription adapterDescription) {
-        producer = new SpKafkaProducer();
-
-        KafkaTransportProtocol kafkaTransportProtocol = (KafkaTransportProtocol) adapterDescription
-                .getEventGrounding()
-                .getTransportProtocol();
-
-        if ("true".equals(System.getenv("SP_DEBUG"))) {
-            kafkaTransportProtocol.setBrokerHostname("localhost");
-            kafkaTransportProtocol.setKafkaPort(9094);
-        }
-
-        TransportFormat transportFormat =
-                adapterDescription.getEventGrounding().getTransportFormats().get(0);
-
-        this.dataFormatDefinition =
-                new TransportFormatSelector(transportFormat).getDataFormatDefinition();
-
-        producer.connect(kafkaTransportProtocol);
+        super(adapterDescription, SpKafkaProducer::new, KafkaTransportProtocol.class);
     }
 
     @Override
-    public Map<String, Object> process(Map<String, Object> event) {
-        try {
-            if (event != null) {
-
-                // TODO remove, just for performance tests
-                if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
-                    event.put("internal_t2", System.currentTimeMillis());
-                }
-
-                producer.publish(dataFormatDefinition.fromMap(event));
-            }
-        } catch (SpRuntimeException e) {
-            e.printStackTrace();
-        }
-
-        return null;
-    }
-
-    public void changeTransportProtocol(TransportProtocol transportProtocol) {
-        producer.disconnect();
-        producer.connect((KafkaTransportProtocol) transportProtocol);
+    public void modifyProtocolForDebugging() {
+        this.protocol.setBrokerHostname("localhost");
+        this.protocol.setKafkaPort(9094);
     }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
similarity index 51%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
index 0be5dd9..d1e9048 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java
@@ -15,13 +15,22 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
-package org.apache.streampipes.connect.adapter.model.pipeline;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipelineElement;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 
-import java.util.Map;
+public class SendToMqttAdapterSink extends SendToBrokerAdapterSink<MqttTransportProtocol>
+        implements AdapterPipelineElement {
 
-public interface AdapterPipelineElement {
-
-    public Map<String, Object> process(Map<String, Object> event);
+    public SendToMqttAdapterSink(AdapterDescription adapterDescription) {
+        super(adapterDescription, MqttPublisher::new, MqttTransportProtocol.class);
+    }
 
+    @Override
+    public void modifyProtocolForDebugging() {
+        this.protocol.setBrokerHostname("localhost");
+    }
 }
diff --git a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/GroundingServiceTest.java b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/GroundingServiceTest.java
index 80a253a..d188583 100644
--- a/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/GroundingServiceTest.java
+++ b/streampipes-connect/src/test/java/org/apache/streampipes/connect/adapter/GroundingServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.adapter;
 
+import org.apache.streampipes.config.backend.SpProtocol;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -119,11 +120,10 @@ public class GroundingServiceTest {
         BackendConfig backendConfig = mock(BackendConfig.INSTANCE.getClass());
         when(backendConfig.getMessagingSettings()).thenReturn(MessagingSettings.fromDefault());
         Whitebox.setInternalState(BackendConfig.class, "INSTANCE", backendConfig);
+        EventGrounding eventGrounding = GroundingService.createEventGrounding();
 
-        EventGrounding eventGrounding = GroundingService.createEventGrounding("localhost", 1, null);
-
-        assertEquals("localhost", eventGrounding.getTransportProtocol().getBrokerHostname());
-        assertEquals(1, ((KafkaTransportProtocol)eventGrounding.getTransportProtocol()).getKafkaPort());
+//        assertEquals("localhost", eventGrounding.getTransportProtocol().getBrokerHostname());
+//        assertEquals(0, ((KafkaTransportProtocol)eventGrounding.getTransportProtocol()).getKafkaPort());
         assertTrue(eventGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName().startsWith("org.apache.streampipes.connect"));
 
     }
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
index 64e0f88..a32aaf2 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
@@ -59,7 +59,8 @@ public class ActiveMQConsumer extends ActiveMQConnectionProvider implements
   @Override
   public void connect(JmsTransportProtocol protocolSettings, InternalEventProcessor<byte[]>
           eventProcessor) throws SpRuntimeException {
-    String url = protocolSettings.getBrokerHostname() + ":" + protocolSettings.getPort();
+    String url = ActiveMQUtils.makeActiveMqUrl(protocolSettings);
+
     try {
       this.eventProcessor = eventProcessor;
       session = startJmsConnection(url).createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
index db610fc..ad7d4b1 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
@@ -43,17 +43,29 @@ public class ActiveMQPublisher implements EventProducer<JmsTransportProtocol> {
   private Session session;
   private MessageProducer producer;
 
-  private Boolean connected;
+  private Boolean connected = false;
 
   public ActiveMQPublisher() {
 
   }
 
-  // TODO backwards compatibility, remove later
-  public ActiveMQPublisher(String url, String topic) {
+  @Deprecated
+//  public ActiveMQPublisher(String url, String topic) {
+//    JmsTransportProtocol protocol = new JmsTransportProtocol();
+//    protocol.setBrokerHostname(url.substring(0, url.lastIndexOf(":")));
+//    protocol.setPort(Integer.parseInt(url.substring(url.lastIndexOf(":") + 1, url.length())));
+//    protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
+//    try {
+//      connect(protocol);
+//    } catch (SpRuntimeException e) {
+//      e.printStackTrace();
+//    }
+//  }
+
+  public ActiveMQPublisher(String host, int port, String topic) {
     JmsTransportProtocol protocol = new JmsTransportProtocol();
-    protocol.setBrokerHostname(url.substring(0, url.lastIndexOf(":")));
-    protocol.setPort(Integer.parseInt(url.substring(url.lastIndexOf(":") + 1, url.length())));
+    protocol.setBrokerHostname(host);
+    protocol.setPort(port);
     protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
     try {
       connect(protocol);
@@ -69,7 +81,7 @@ public class ActiveMQPublisher implements EventProducer<JmsTransportProtocol> {
   @Override
   public void connect(JmsTransportProtocol protocolSettings) throws SpRuntimeException {
 
-    String url = protocolSettings.getBrokerHostname() + ":" + protocolSettings.getPort();
+    String url = ActiveMQUtils.makeActiveMqUrl(protocolSettings);
     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
 
     boolean co = false;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
similarity index 65%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
copy to streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
index 0be5dd9..0a66af0 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipelineElement.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQUtils.java
@@ -1,4 +1,4 @@
-/*
+package org.apache.streampipes.messaging.jms;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,12 +16,14 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.model.pipeline;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 
-import java.util.Map;
+public class ActiveMQUtils {
 
-public interface AdapterPipelineElement {
-
-    public Map<String, Object> process(Map<String, Object> event);
+    private static final String TCP_PROTOCOL = "tcp://";
+    private static final String COLON = ":";
 
+    public static String makeActiveMqUrl(JmsTransportProtocol protocol) {
+        return TCP_PROTOCOL + protocol.getBrokerHostname() + COLON + protocol.getPort();
+    }
 }
diff --git a/streampipes-messaging-mqtt/pom.xml b/streampipes-messaging-mqtt/pom.xml
index f60b471..39f493b 100644
--- a/streampipes-messaging-mqtt/pom.xml
+++ b/streampipes-messaging-mqtt/pom.xml
@@ -39,6 +39,10 @@
             <groupId>org.fusesource.mqtt-client</groupId>
             <artifactId>mqtt-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.fusesource.hawtbuf</groupId>
+            <artifactId>hawtbuf</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
index 458514d..d2458a7 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
@@ -31,19 +31,37 @@ public class MqttConsumer extends AbstractMqttConnector implements EventConsumer
     try {
       this.createBrokerConnection(protocolSettings);
       Topic[] topics = {new Topic(protocolSettings.getTopicDefinition().getActualTopicName(), QoS.AT_LEAST_ONCE)};
-      byte[] qoses = connection.subscribe(topics);
+      connection.subscribe(topics);
+      new Thread(new ConsumerThread(eventProcessor)).start();
 
-      while (connected) {
-        Message message = connection.receive();
-        byte[] payload = message.getPayload();
-        eventProcessor.onEvent(payload);
-        message.ack();
-      }
     } catch (Exception e) {
       throw new SpRuntimeException(e);
     }
   }
 
+  private class ConsumerThread implements Runnable {
+
+    private final InternalEventProcessor<byte[]> eventProcessor;
+
+    public ConsumerThread(InternalEventProcessor<byte[]> eventProcessor) {
+      this.eventProcessor = eventProcessor;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (connected) {
+          Message message = connection.receive();
+          byte[] payload = message.getPayload();
+          eventProcessor.onEvent(payload);
+          message.ack();
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
   @Override
   public void disconnect() throws SpRuntimeException {
     try {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index ee0ca20..c1f5e5b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.model.util;
 
+import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.output.*;
 import org.apache.streampipes.model.staticproperty.*;
 import org.slf4j.Logger;
@@ -35,14 +36,6 @@ import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescriptio
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.grounding.JmsTransportProtocol;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
-import org.apache.streampipes.model.grounding.TopicDefinition;
-import org.apache.streampipes.model.grounding.TransportFormat;
-import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.model.grounding.WildcardTopicDefinition;
-import org.apache.streampipes.model.grounding.WildcardTopicMapping;
 import org.apache.streampipes.model.quality.Accuracy;
 import org.apache.streampipes.model.quality.EventPropertyQualityDefinition;
 import org.apache.streampipes.model.quality.EventPropertyQualityRequirement;
@@ -137,6 +130,8 @@ public class Cloner {
       return new KafkaTransportProtocol((KafkaTransportProtocol) protocol);
     } else if (protocol instanceof JmsTransportProtocol){
       return new JmsTransportProtocol((JmsTransportProtocol) protocol);
+    } else if (protocol instanceof MqttTransportProtocol) {
+      return new MqttTransportProtocol((MqttTransportProtocol) protocol);
     } else {
       LOG.error("Could not clone protocol of type {}", protocol.getClass().getCanonicalName());
       return protocol;
diff --git a/streampipes-pipeline-management/pom.xml b/streampipes-pipeline-management/pom.xml
index ed38dc8..9800aa8 100644
--- a/streampipes-pipeline-management/pom.xml
+++ b/streampipes-pipeline-management/pom.xml
@@ -69,6 +69,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-messaging-mqtt</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-messaging-kafka</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index 018285e..ec3a891 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -19,12 +19,14 @@
 package org.apache.streampipes.manager.matching;
 
 import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.manager.util.TopicGenerator;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 
 import java.util.List;
@@ -33,10 +35,13 @@ import java.util.Set;
 public class ProtocolSelector extends GroundingSelector {
 
     private String outputTopic;
+    private List<SpProtocol> prioritizedProtocols;
 
     public ProtocolSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
         super(source, targets);
         this.outputTopic = TopicGenerator.generateRandomTopic();
+        this.prioritizedProtocols =
+                BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols();
     }
 
     public TransportProtocol getPreferredProtocol() {
@@ -45,17 +50,35 @@ public class ProtocolSelector extends GroundingSelector {
                     .getEventGrounding()
                     .getTransportProtocol();
         } else {
-            if (supportsProtocol(KafkaTransportProtocol.class)) {
-                return kafkaTopic();
-            } else if (supportsProtocol(JmsTransportProtocol.class)) {
-                return new JmsTransportProtocol(BackendConfig.INSTANCE.getJmsHost(),
-                        BackendConfig.INSTANCE.getJmsPort(),
-                        outputTopic);
+            for(SpProtocol prioritizedProtocol: prioritizedProtocols) {
+                if (prioritizedProtocol.getProtocolClass().equals(KafkaTransportProtocol.class.getCanonicalName()) &&
+                        supportsProtocol(KafkaTransportProtocol.class)) {
+                    return kafkaTopic();
+                }
+                else if (prioritizedProtocol.getProtocolClass().equals(JmsTransportProtocol.class.getCanonicalName()) &&
+                        supportsProtocol(JmsTransportProtocol.class)) {
+                    return jmsTopic();
+                } else if (prioritizedProtocol.getProtocolClass().equals(MqttTransportProtocol.class.getCanonicalName()) &&
+                        supportsProtocol(MqttTransportProtocol.class)) {
+                    return mqttTopic();
+                }
             }
         }
         return kafkaTopic();
     }
 
+    private TransportProtocol mqttTopic() {
+        return new MqttTransportProtocol(BackendConfig.INSTANCE.getMqttHost(),
+                BackendConfig.INSTANCE.getMqttPort(),
+                outputTopic);
+    }
+
+    private TransportProtocol jmsTopic() {
+        return new JmsTransportProtocol(BackendConfig.INSTANCE.getJmsHost(),
+                BackendConfig.INSTANCE.getJmsPort(),
+                outputTopic);
+    }
+
     private TransportProtocol kafkaTopic() {
         return new KafkaTransportProtocol(BackendConfig.INSTANCE.getKafkaHost(),
                 BackendConfig.INSTANCE.getKafkaPort(),
@@ -74,7 +97,7 @@ public class ProtocolSelector extends GroundingSelector {
                         .getSupportedGrounding()
                         .getTransportProtocols()
                         .stream()
-                        .anyMatch(p -> protocol.isInstance(p)));
+                        .anyMatch(protocol::isInstance));
 
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index 6c0e0e6..a40ac03 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,7 +17,11 @@
  */
 package org.apache.streampipes.manager.runtime;
 
+import com.google.inject.internal.cglib.core.$LocalVariablesSorter;
+import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
+import org.apache.streampipes.messaging.mqtt.MqttConsumer;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -32,12 +36,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 public enum PipelineElementRuntimeInfoFetcher {
-
   INSTANCE;
 
   Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
 
-  private Map<String, SpDataFormatConverter> converterMap;
+  private final int FETCH_INTERVAL_MS = 300;
+  private final Map<String, SpDataFormatConverter> converterMap;
 
   PipelineElementRuntimeInfoFetcher() {
     this.converterMap = new HashMap<>();
@@ -47,25 +51,46 @@ public enum PipelineElementRuntimeInfoFetcher {
 
     if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) {
       return getLatestEventFromKafka(spDataStream);
-    } else {
+    }
+    else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol){
       return getLatestEventFromJms(spDataStream);
+    } else {
+      return getLatestEventFromMqtt(spDataStream);
     }
+  }
 
+  private TransportFormat getTransportFormat(SpDataStream spDataStream) {
+    return spDataStream.getEventGrounding().getTransportFormats().get(0);
+  }
+
+  private String getOutputTopic(SpDataStream spDataStream) {
+    return spDataStream
+            .getEventGrounding()
+            .getTransportProtocol()
+            .getTopicDefinition()
+            .getActualTopicName();
   }
 
   private String getLatestEventFromJms(SpDataStream spDataStream) throws SpRuntimeException {
     final String[] result = {null};
-    final String topic = getOutputTopic(spDataStream);
-    if (!converterMap.containsKey(topic)) {
-      this.converterMap.put(topic,
+    String jmsTopic = getOutputTopic(spDataStream);
+    JmsTransportProtocol protocol = (JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
+
+    // Change jms config when running in development mode
+    if ("true".equals(System.getenv("SP_DEBUG"))) {
+      protocol.setBrokerHostname("localhost");
+    }
+    if (!converterMap.containsKey(jmsTopic)) {
+      this.converterMap.put(jmsTopic,
               new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
     }
+
     ActiveMQConsumer consumer = new ActiveMQConsumer();
     consumer.connect((JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol(), new InternalEventProcessor<byte[]>() {
       @Override
       public void onEvent(byte[] event) {
         try {
-          result[0] = converterMap.get(topic).convert(event);
+          result[0] = converterMap.get(jmsTopic).convert(event);
           consumer.disconnect();
         } catch (SpRuntimeException e) {
           e.printStackTrace();
@@ -75,7 +100,7 @@ public enum PipelineElementRuntimeInfoFetcher {
 
     while (result[0] == null) {
       try {
-        Thread.sleep(300);
+        Thread.sleep(FETCH_INTERVAL_MS);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
@@ -84,16 +109,42 @@ public enum PipelineElementRuntimeInfoFetcher {
     return result[0];
   }
 
-  private TransportFormat getTransportFormat(SpDataStream spDataStream) {
-    return spDataStream.getEventGrounding().getTransportFormats().get(0);
-  }
+  private String getLatestEventFromMqtt(SpDataStream spDataStream) throws SpRuntimeException {
+    final String[] result = {null};
+    String mqttTopic = getOutputTopic(spDataStream);
+    MqttTransportProtocol protocol = (MqttTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
-  private String getOutputTopic(SpDataStream spDataStream) {
-    return spDataStream
-            .getEventGrounding()
-            .getTransportProtocol()
-            .getTopicDefinition()
-            .getActualTopicName();
+    // Change mqtt config when running in development mode
+    if ("true".equals(System.getenv("SP_DEBUG"))) {
+      protocol.setBrokerHostname("localhost");
+    }
+
+    if (!converterMap.containsKey(mqttTopic)){
+      this.converterMap.put(mqttTopic,
+              new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
+    }
+    MqttConsumer mqttConsumer = new MqttConsumer();
+    mqttConsumer.connect(protocol, new InternalEventProcessor<byte[]>() {
+      @Override
+      public void onEvent(byte[] event) {
+        try {
+          result[0] = converterMap.get(mqttTopic).convert(event);
+          mqttConsumer.disconnect();
+        } catch (SpRuntimeException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+
+    while (result[0] == null) {
+      try {
+        Thread.sleep(FETCH_INTERVAL_MS);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    return result[0];
   }
 
   private String getLatestEventFromKafka(SpDataStream spDataStream) throws SpRuntimeException {
@@ -129,7 +180,7 @@ public enum PipelineElementRuntimeInfoFetcher {
     long timeout = 0;
     while (result[0] == null && timeout < 6000) {
       try {
-        Thread.sleep(300);
+        Thread.sleep(FETCH_INTERVAL_MS);
         timeout = timeout + 300;
       } catch (InterruptedException e) {
         e.printStackTrace();
diff --git a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
index 07fa27c..d9720ad 100644
--- a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
+++ b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.html
@@ -73,6 +73,26 @@
                 </button>
             </div>
         </div>
+    </div>
+    <div fxFlex="100" fxLayout="column" fxLayoutAlign="start stretch" style="margin-top:40px;">
+        <div fxFlex="100" class="assemblyOptions sp-blue-bg" style="padding:5px;">
+            <div fxLayout="row" fxLayoutAlign="start center" fxFlex="100">
+                <h4>Protocols</h4>
+                <span flex></span>
+            </div>
+        </div>
+        <div fxFlex="100" fxLayout="column" fxLayoutAlign="start start" class="sp-blue-border page-container-padding-inner">
+            <div cdkDropList class="data-format-list" (cdkDropListDropped)="dropProtocol($event)"
+                 *ngIf="loadingCompleted">
+                <div class="data-format-box" *ngFor="let protocol of messagingSettings.prioritizedProtocols" cdkDrag>
+                    {{protocol}}
+                </div>
+            </div>
+            <div fxLayoutAlign="end center">
+                <button mat-raised-button color="primary" type="submit" class="md-raised md-primary submit-button" (click)="updateMessagingSettings()">Update
+                </button>
+            </div>
+        </div>
 
     </div>
 </div>
\ No newline at end of file
diff --git a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.ts b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.ts
index 4585f81..99617d7 100644
--- a/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.ts
+++ b/ui/src/app/configuration/messaging-configuration/messaging-configuration.component.ts
@@ -53,4 +53,8 @@ export class MessagingConfigurationComponent {
     drop(event: CdkDragDrop<string[]>) {
         moveItemInArray(this.messagingSettings.prioritizedFormats, event.previousIndex, event.currentIndex);
     }
+
+    dropProtocol(event: CdkDragDrop<string[]>) {
+        moveItemInArray(this.messagingSettings.prioritizedProtocols, event.previousIndex, event.currentIndex);
+    }
 }
\ No newline at end of file
diff --git a/ui/src/app/configuration/shared/messaging-settings.model.ts b/ui/src/app/configuration/shared/messaging-settings.model.ts
index c29ee1a..501e63e 100644
--- a/ui/src/app/configuration/shared/messaging-settings.model.ts
+++ b/ui/src/app/configuration/shared/messaging-settings.model.ts
@@ -24,4 +24,5 @@ export interface MessagingSettings {
     acks: number;
 
    prioritizedFormats: [string];
+   prioritizedProtocols: [string];
 }
\ No newline at end of file