You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/03/30 20:23:02 UTC
[incubator-streampipes] branch dev updated: Add
streampipes-messaging-mqtt module
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new aba7efc Add streampipes-messaging-mqtt module
new 4c3d6cb Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
aba7efc is described below
commit aba7efc1529895d8522e3e1f483da6ad36c32c26
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Mar 30 22:22:31 2020 +0200
Add streampipes-messaging-mqtt module
---
pom.xml | 7 +++
streampipes-messaging-mqtt/pom.xml | 46 +++++++++++++++
.../messaging/mqtt/AbstractMqttConnector.java | 41 +++++++++++++
.../streampipes/messaging/mqtt/MqttConsumer.java | 62 +++++++++++++++++++
.../streampipes/messaging/mqtt/MqttPublisher.java | 69 ++++++++++++++++++++++
.../streampipes/messaging/mqtt/SpMqttProtocol.java | 43 ++++++++++++++
.../messaging/mqtt/SpMqttProtocolFactory.java | 40 +++++++++++++
.../model/grounding/MqttTransportProtocol.java | 61 +++++++++++++++++++
.../model/grounding/TransportProtocol.java | 4 +-
.../jsonld/CustomAnnotationProvider.java | 4 +-
.../apache/streampipes/vocabulary/StreamPipes.java | 2 +
11 files changed, 376 insertions(+), 3 deletions(-)
diff --git a/pom.xml b/pom.xml
index 40775a4..fedc9a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@
<log4j.version>2.12.1</log4j.version>
<logback-classic.version>1.2.3</logback-classic.version>
<maven-invoker.version>2.2</maven-invoker.version>
+ <mqtt-client.version>1.12</mqtt-client.version>
<objenesis.version>2.5.1</objenesis.version>
<okio.version>1.16.0</okio.version>
<okhttp.version>3.12.2</okhttp.version>
@@ -515,6 +516,11 @@
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>${mqtt-client.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
<version>${jboss-logging.version}</version>
@@ -829,6 +835,7 @@
<module>streampipes-connect-container-master</module>
<module>streampipes-connect-container-worker</module>
<module>streampipes-container-base</module>
+ <module>streampipes-messaging-mqtt</module>
</modules>
<profiles>
diff --git a/streampipes-messaging-mqtt/pom.xml b/streampipes-messaging-mqtt/pom.xml
new file mode 100644
index 0000000..d1dd5aa
--- /dev/null
+++ b/streampipes-messaging-mqtt/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streampipes-parent</artifactId>
+ <groupId>org.apache.streampipes</groupId>
+ <version>0.65.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streampipes-messaging-mqtt</artifactId>
+
+ <dependencies>
+ <!-- StreamPipes dependencies -->
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-messaging</artifactId>
+ <version>0.65.1-SNAPSHOT</version>
+ </dependency>
+
+ <!-- External dependencies -->
+ <dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
new file mode 100644
index 0000000..c34716e
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+
+public class AbstractMqttConnector {
+
+ protected MQTT mqtt;
+ protected BlockingConnection connection;
+ protected Boolean connected = false;
+
+ protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
+ this.mqtt = new MQTT();
+ this.mqtt.setHost(makeBrokerUrl(protocolSettings));
+ this.connection = mqtt.blockingConnection();
+ this.connection.connect();
+ this.connected = true;
+ }
+
+ private String makeBrokerUrl(MqttTransportProtocol protocolSettings) {
+ return "tcp://" + protocolSettings.getBrokerHostname() + ":" + protocolSettings.getPort();
+ }
+
+}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
new file mode 100644
index 0000000..458514d
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttConsumer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+
+public class MqttConsumer extends AbstractMqttConnector implements EventConsumer<MqttTransportProtocol> {
+
+ @Override
+ public void connect(MqttTransportProtocol protocolSettings, InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException {
+ try {
+ this.createBrokerConnection(protocolSettings);
+ Topic[] topics = {new Topic(protocolSettings.getTopicDefinition().getActualTopicName(), QoS.AT_LEAST_ONCE)};
+ byte[] qoses = connection.subscribe(topics);
+
+ while (connected) {
+ Message message = connection.receive();
+ byte[] payload = message.getPayload();
+ eventProcessor.onEvent(payload);
+ message.ack();
+ }
+ } catch (Exception e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void disconnect() throws SpRuntimeException {
+ try {
+ this.connection.disconnect();
+ } catch (Exception e) {
+ throw new SpRuntimeException(e);
+ } finally {
+ this.connected = false;
+ }
+ }
+
+ @Override
+ public Boolean isConnected() {
+ return this.connected;
+ }
+}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
new file mode 100644
index 0000000..04754d6
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.fusesource.mqtt.client.QoS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttPublisher extends AbstractMqttConnector implements EventProducer<MqttTransportProtocol> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class);
+
+ private String currentTopic;
+
+ @Override
+ public void connect(MqttTransportProtocol protocolSettings) throws SpRuntimeException {
+ try {
+ this.createBrokerConnection(protocolSettings);
+ this.currentTopic = protocolSettings.getTopicDefinition().getActualTopicName();
+ } catch (Exception e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void publish(byte[] event) {
+ if (connected && currentTopic != null) {
+ try {
+ this.connection.publish(currentTopic, event, QoS.AT_LEAST_ONCE, false);
+ } catch (Exception e) {
+ // TODO exception handling once system-wide logging is implemented
+ LOG.error(e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void disconnect() throws SpRuntimeException {
+ try {
+ this.connection.disconnect();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ this.connected = false;
+ }
+ }
+
+ @Override
+ public Boolean isConnected() {
+ return connected;
+ }
+}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java
new file mode 100644
index 0000000..9c259bf
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocol.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.EventProducer;
+import org.apache.streampipes.messaging.SpProtocolDefinition;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
+public class SpMqttProtocol implements SpProtocolDefinition<MqttTransportProtocol> {
+
+ private EventConsumer<MqttTransportProtocol> mqttConsumer;
+ private EventProducer<MqttTransportProtocol> mqttProducer;
+
+ public SpMqttProtocol() {
+ this.mqttConsumer = new MqttConsumer();
+ this.mqttProducer = new MqttPublisher();
+ }
+
+ @Override
+ public EventConsumer<MqttTransportProtocol> getConsumer() {
+ return this.mqttConsumer;
+ }
+
+ @Override
+ public EventProducer<MqttTransportProtocol> getProducer() {
+ return this.mqttProducer;
+ }
+}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java
new file mode 100644
index 0000000..dea95ab
--- /dev/null
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/SpMqttProtocolFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.messaging.mqtt;
+
+import org.apache.streampipes.messaging.SpProtocolDefinition;
+import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+
+public class SpMqttProtocolFactory extends SpProtocolDefinitionFactory<MqttTransportProtocol> {
+
+ @Override
+ public TransportProtocol getTransportProtocol() {
+ return new MqttTransportProtocol();
+ }
+
+ @Override
+ public String getTransportProtocolClass() {
+ return MqttTransportProtocol.class.getCanonicalName();
+ }
+
+ @Override
+ public SpProtocolDefinition<MqttTransportProtocol> createInstance() {
+ return new SpMqttProtocol();
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/MqttTransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/MqttTransportProtocol.java
new file mode 100644
index 0000000..95b30bd
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/MqttTransportProtocol.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.model.grounding;
+
+import io.fogsy.empire.annotations.RdfProperty;
+import io.fogsy.empire.annotations.RdfsClass;
+import org.apache.streampipes.vocabulary.StreamPipes;
+
+import javax.persistence.Entity;
+
+@RdfsClass(StreamPipes.MQTT_TRANSPORT_PROTOCOL)
+@Entity
+public class MqttTransportProtocol extends TransportProtocol {
+
+ @RdfProperty(StreamPipes.HAS_MQTT_PORT)
+ private int port;
+
+ public MqttTransportProtocol(String hostname, int port, String topicName)
+ {
+ super(hostname, new SimpleTopicDefinition(topicName));
+ this.port = port;
+ }
+
+ public MqttTransportProtocol(MqttTransportProtocol other)
+ {
+ super(other);
+ this.port = other.getPort();
+ }
+
+ public MqttTransportProtocol()
+ {
+ super();
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public String toString() {
+ return getBrokerHostname() + ":" + getPort();
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
index 0262251..3d65d45 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
@@ -48,10 +48,10 @@ public abstract class TransportProtocol extends UnnamedStreamPipesEntity {
super();
}
- public TransportProtocol(String uri, TopicDefinition topicDefinition)
+ public TransportProtocol(String hostname, TopicDefinition topicDefinition)
{
super();
- this.brokerHostname = uri;
+ this.brokerHostname = hostname;
this.topicDefinition = topicDefinition;
}
diff --git a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index 102ae03..07fcff4 100644
--- a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.serializers.jsonld;
+import io.fogsy.empire.core.empire.util.EmpireAnnotationProvider;
import org.apache.streampipes.model.ApplicationLink;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
@@ -60,6 +61,7 @@ import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.grounding.TransportProtocol;
@@ -117,7 +119,6 @@ import org.apache.streampipes.model.template.BoundPipelineElement;
import org.apache.streampipes.model.template.PipelineTemplateDescription;
import org.apache.streampipes.model.template.PipelineTemplateDescriptionContainer;
import org.apache.streampipes.model.template.PipelineTemplateInvocation;
-import io.fogsy.empire.core.empire.util.EmpireAnnotationProvider;
import java.lang.annotation.Annotation;
import java.util.Arrays;
@@ -185,6 +186,7 @@ public class CustomAnnotationProvider implements EmpireAnnotationProvider {
TransportFormat.class,
JmsTransportProtocol.class,
KafkaTransportProtocol.class,
+ MqttTransportProtocol.class,
TransportProtocol.class,
DomainStaticProperty.class,
SupportedProperty.class,
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 06e8f36..9970a6b 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -111,6 +111,7 @@ public class StreamPipes {
public static final String JMS_TRANSPORT_PROTOCOL = NS + "JmsTransportProtocol";
public static final String KAFKA_TRANSPORT_PROTOCOL = NS + "KafkaTransportProtocol";
+ public static final String MQTT_TRANSPORT_PROTOCOL = NS + "MqttTransportProtocol";
public static final String TRANSPORT_FORMAT = NS + "TransportFormat";
public static final String TRANSPORT_PROTOCOL = NS + "TransportProtocol";
@@ -143,6 +144,7 @@ public class StreamPipes {
public static final String HAS_TRANSPORT_PROTOCOL = NS + "hasTransportProtocol";
public static final String HAS_TRANSPORT_FORMAT = NS + "hasTransportFormat";
public static final String JMS_PORT = NS + "jmsPort";
+ public static final String HAS_MQTT_PORT = NS + "hasMqttPort";
public static final String ZOOKEEPER_HOST = NS + "zookeeperHost";
public static final String ZOOKEEPER_PORT = NS + "zookeeperPort";