You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/03/30 20:23:02 UTC

[incubator-streampipes] branch dev updated: Add streampipes-messaging-mqtt module

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new aba7efc  Add streampipes-messaging-mqtt module
     new 4c3d6cb  Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
aba7efc is described below

commit aba7efc1529895d8522e3e1f483da6ad36c32c26
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Mar 30 22:22:31 2020 +0200

    Add streampipes-messaging-mqtt module
---
 pom.xml                                            |  7 +++
 streampipes-messaging-mqtt/pom.xml                 | 46 +++++++++++++++
 .../messaging/mqtt/AbstractMqttConnector.java      | 41 +++++++++++++
 .../streampipes/messaging/mqtt/MqttConsumer.java   | 62 +++++++++++++++++++
 .../streampipes/messaging/mqtt/MqttPublisher.java  | 69 ++++++++++++++++++++++
 .../streampipes/messaging/mqtt/SpMqttProtocol.java | 43 ++++++++++++++
 .../messaging/mqtt/SpMqttProtocolFactory.java      | 40 +++++++++++++
 .../model/grounding/MqttTransportProtocol.java     | 61 +++++++++++++++++++
 .../model/grounding/TransportProtocol.java         |  4 +-
 .../jsonld/CustomAnnotationProvider.java           |  4 +-
 .../apache/streampipes/vocabulary/StreamPipes.java |  2 +
 11 files changed, 376 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index 40775a4..fedc9a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@
         <log4j.version>2.12.1</log4j.version>
         <logback-classic.version>1.2.3</logback-classic.version>
         <maven-invoker.version>2.2</maven-invoker.version>
+        <mqtt-client.version>1.12</mqtt-client.version>
         <objenesis.version>2.5.1</objenesis.version>
         <okio.version>1.16.0</okio.version>
         <okhttp.version>3.12.2</okhttp.version>
@@ -515,6 +516,11 @@
                 <version>${elasticsearch.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.fusesource.mqtt-client</groupId>
+                <artifactId>mqtt-client</artifactId>
+                <version>${mqtt-client.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.jboss.logging</groupId>
                 <artifactId>jboss-logging</artifactId>
                 <version>${jboss-logging.version}</version>
@@ -829,6 +835,7 @@
         <module>streampipes-connect-container-master</module>
         <module>streampipes-connect-container-worker</module>
         <module>streampipes-container-base</module>
+        <module>streampipes-messaging-mqtt</module>
     </modules>
 
     <profiles>
diff --git a/streampipes-messaging-mqtt/pom.xml b/streampipes-messaging-mqtt/pom.xml
new file mode 100644
index 0000000..d1dd5aa
--- /dev/null
+++ b/streampipes-messaging-mqtt/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~   Licensed to the Apache Software Foundation (ASF) under one or more
+  ~   contributor license agreements.  See the NOTICE file distributed with
+  ~   this work for additional information regarding copyright ownership.
+  ~   The ASF licenses this file to You under the Apache License, Version 2.0
+  ~   (the "License"); you may not use this file except in compliance with
+  ~   the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streampipes-parent</artifactId>
+        <groupId>org.apache.streampipes</groupId>
+        <version>0.65.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streampipes-messaging-mqtt</artifactId>
+
+    <dependencies>
+        <!-- StreamPipes dependencies -->
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-messaging</artifactId>
+            <version>0.65.1-SNAPSHOT</version>
+        </dependency>
+
+        <!-- External dependencies -->
+        <dependency>
+            <groupId>org.fusesource.mqtt-client</groupId>
+            <artifactId>mqtt-client</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
new file mode 100644
index 0000000..c34716e
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -0,0 +1,41 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+
+public class AbstractMqttConnector {
+
+  protected MQTT mqtt;
+  protected BlockingConnection connection;
+  protected Boolean connected = false;
+
+  protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
+    this.mqtt = new MQTT();
+    this.mqtt.setHost(makeBrokerUrl(protocolSettings));
+    this.connection = mqtt.blockingConnection();
+    this.connection.connect();
+    this.connected = true;
+  }
+
+  private String makeBrokerUrl(MqttTransportProtocol protocolSettings) {
+    return "tcp://" + protocolSettings.getBrokerHostname() + ":" + protocolSettings.getPort();
+  }
+
+}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
new file mode 100644
index 0000000..458514d
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
@@ -0,0 +1,62 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+
+public class MqttConsumer extends AbstractMqttConnector implements EventConsumer<MqttTransportProtocol> {
+
+  @Override
+  public void connect(MqttTransportProtocol protocolSettings, InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException {
+    try {
+      this.createBrokerConnection(protocolSettings);
+      Topic[] topics = {new Topic(protocolSettings.getTopicDefinition().getActualTopicName(), QoS.AT_LEAST_ONCE)};
+      byte[] qoses = connection.subscribe(topics);
+
+      while (connected) {
+        Message message = connection.receive();
+        byte[] payload = message.getPayload();
+        eventProcessor.onEvent(payload);
+        message.ack();
+      }
+    } catch (Exception e) {
+      throw new SpRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void disconnect() throws SpRuntimeException {
+    try {
+      this.connection.disconnect();
+    } catch (Exception e) {
+      throw new SpRuntimeException(e);
+    } finally {
+      this.connected = false;
+    }
+  }
+
+  @Override
+  public Boolean isConnected() {
+    return this.connected;
+  }
+}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
new file mode 100644
index 0000000..04754d6
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
@@ -0,0 +1,69 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.fusesource.mqtt.client.QoS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttPublisher extends AbstractMqttConnector implements EventProducer<MqttTransportProtocol> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class);
+
+  private String currentTopic;
+
+  @Override
+  public void connect(MqttTransportProtocol protocolSettings) throws SpRuntimeException {
+    try {
+      this.createBrokerConnection(protocolSettings);
+      this.currentTopic = protocolSettings.getTopicDefinition().getActualTopicName();
+    } catch (Exception e) {
+      throw new SpRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void publish(byte[] event) {
+    if (connected && currentTopic != null) {
+      try {
+        this.connection.publish(currentTopic, event, QoS.AT_LEAST_ONCE, false);
+      } catch (Exception e) {
+        // TODO exception handling once system-wide logging is implemented
+        LOG.error(e.getMessage());
+      }
+    }
+  }
+
+  @Override
+  public void disconnect() throws SpRuntimeException {
+    try {
+      this.connection.disconnect();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      this.connected = false;
+    }
+  }
+
+  @Override
+  public Boolean isConnected() {
+    return connected;
+  }
+}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java
new file mode 100644
index 0000000..9c259bf
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java
@@ -0,0 +1,43 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.messaging.SpProtocolDefinition;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
+public class SpMqttProtocol implements SpProtocolDefinition<MqttTransportProtocol> {
+
+  private EventConsumer<MqttTransportProtocol> mqttConsumer;
+  private EventProducer<MqttTransportProtocol> mqttProducer;
+
+  public SpMqttProtocol() {
+    this.mqttConsumer = new MqttConsumer();
+    this.mqttProducer = new MqttPublisher();
+  }
+
+  @Override
+  public EventConsumer<MqttTransportProtocol> getConsumer() {
+    return this.mqttConsumer;
+  }
+
+  @Override
+  public EventProducer<MqttTransportProtocol> getProducer() {
+    return this.mqttProducer;
+  }
+}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java
new file mode 100644
index 0000000..dea95ab
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java
@@ -0,0 +1,40 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.messaging.SpProtocolDefinition;
+import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+
+public class SpMqttProtocolFactory extends SpProtocolDefinitionFactory<MqttTransportProtocol> {
+  
+  @Override
+  public TransportProtocol getTransportProtocol() {
+    return new MqttTransportProtocol();
+  }
+
+  @Override
+  public String getTransportProtocolClass() {
+    return MqttTransportProtocol.class.getCanonicalName();
+  }
+
+  @Override
+  public SpProtocolDefinition<MqttTransportProtocol> createInstance() {
+    return new SpMqttProtocol();
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/MqttTransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/MqttTransportProtocol.java
new file mode 100644
index 0000000..95b30bd
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/MqttTransportProtocol.java
@@ -0,0 +1,61 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.model.grounding;
+
+import io.fogsy.empire.annotations.RdfProperty;
+import io.fogsy.empire.annotations.RdfsClass;
+import org.apache.streampipes.vocabulary.StreamPipes;
+
+import javax.persistence.Entity;
+
+@RdfsClass(StreamPipes.MQTT_TRANSPORT_PROTOCOL)
+@Entity
+public class MqttTransportProtocol extends TransportProtocol {
+
+  @RdfProperty(StreamPipes.HAS_MQTT_PORT)
+  private int port;
+
+  public MqttTransportProtocol(String hostname, int port, String topicName)
+  {
+    super(hostname, new SimpleTopicDefinition(topicName));
+    this.port = port;
+  }
+
+  public MqttTransportProtocol(MqttTransportProtocol other)
+  {
+    super(other);
+    this.port = other.getPort();
+  }
+
+  public MqttTransportProtocol()
+  {
+    super();
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  @Override
+  public String toString() {
+    return getBrokerHostname() + ":" + getPort();
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
index 0262251..3d65d45 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
@@ -48,10 +48,10 @@ public abstract class TransportProtocol extends UnnamedStreamPipesEntity {
 		super();
 	}
 	
-	public TransportProtocol(String uri, TopicDefinition topicDefinition)
+	public TransportProtocol(String hostname, TopicDefinition topicDefinition)
 	{
 		super();
-		this.brokerHostname = uri;
+		this.brokerHostname = hostname;
 		this.topicDefinition = topicDefinition;
 	}
 
diff --git a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index 102ae03..07fcff4 100644
--- a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.serializers.jsonld;
 
+import io.fogsy.empire.core.empire.util.EmpireAnnotationProvider;
 import org.apache.streampipes.model.ApplicationLink;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
@@ -60,6 +61,7 @@ import org.apache.streampipes.model.graph.DataSourceDescription;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
 import org.apache.streampipes.model.grounding.TransportFormat;
 import org.apache.streampipes.model.grounding.TransportProtocol;
@@ -117,7 +119,6 @@ import org.apache.streampipes.model.template.BoundPipelineElement;
 import org.apache.streampipes.model.template.PipelineTemplateDescription;
 import org.apache.streampipes.model.template.PipelineTemplateDescriptionContainer;
 import org.apache.streampipes.model.template.PipelineTemplateInvocation;
-import io.fogsy.empire.core.empire.util.EmpireAnnotationProvider;
 
 import java.lang.annotation.Annotation;
 import java.util.Arrays;
@@ -185,6 +186,7 @@ public class CustomAnnotationProvider implements EmpireAnnotationProvider {
             TransportFormat.class,
             JmsTransportProtocol.class,
             KafkaTransportProtocol.class,
+            MqttTransportProtocol.class,
             TransportProtocol.class,
             DomainStaticProperty.class,
             SupportedProperty.class,
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 06e8f36..9970a6b 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -111,6 +111,7 @@ public class StreamPipes {
 
   public static final String JMS_TRANSPORT_PROTOCOL = NS + "JmsTransportProtocol";
   public static final String KAFKA_TRANSPORT_PROTOCOL = NS + "KafkaTransportProtocol";
+  public static final String MQTT_TRANSPORT_PROTOCOL = NS + "MqttTransportProtocol";
   public static final String TRANSPORT_FORMAT = NS + "TransportFormat";
   public static final String TRANSPORT_PROTOCOL = NS + "TransportProtocol";
 
@@ -143,6 +144,7 @@ public class StreamPipes {
   public static final String HAS_TRANSPORT_PROTOCOL = NS + "hasTransportProtocol";
   public static final String HAS_TRANSPORT_FORMAT = NS + "hasTransportFormat";
   public static final String JMS_PORT = NS + "jmsPort";
+  public static final String HAS_MQTT_PORT = NS + "hasMqttPort";
 
   public static final String ZOOKEEPER_HOST = NS + "zookeeperHost";
   public static final String ZOOKEEPER_PORT = NS + "zookeeperPort";