You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/20 23:06:15 UTC
[02/18] storm git commit: STORM-1406: Add MQTT Support
STORM-1406: Add MQTT Support
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f157ab42
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f157ab42
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f157ab42
Branch: refs/heads/1.x-branch
Commit: f157ab423f6a539d07c512fac4dd97734e3d62e5
Parents: 4b672de
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Jan 5 16:49:15 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Jan 5 16:49:15 2016 -0500
----------------------------------------------------------------------
external/storm-mqtt/core/pom.xml | 153 +++++++++
.../java/org/apache/storm/mqtt/MqttLogger.java | 45 +++
.../java/org/apache/storm/mqtt/MqttMessage.java | 41 +++
.../apache/storm/mqtt/MqttMessageMapper.java | 44 +++
.../org/apache/storm/mqtt/MqttTupleMapper.java | 37 ++
.../org/apache/storm/mqtt/bolt/MqttBolt.java | 108 ++++++
.../apache/storm/mqtt/common/MqttOptions.java | 334 +++++++++++++++++++
.../apache/storm/mqtt/common/MqttPublisher.java | 101 ++++++
.../org/apache/storm/mqtt/common/MqttUtils.java | 44 +++
.../org/apache/storm/mqtt/common/SslUtils.java | 64 ++++
.../mqtt/mappers/ByteArrayMessageMapper.java | 34 ++
.../storm/mqtt/mappers/StringMessageMapper.java | 37 ++
.../apache/storm/mqtt/spout/AckableMessage.java | 71 ++++
.../org/apache/storm/mqtt/spout/MqttSpout.java | 291 ++++++++++++++++
.../storm/mqtt/ssl/DefaultKeyStoreLoader.java | 97 ++++++
.../apache/storm/mqtt/ssl/KeyStoreLoader.java | 35 ++
.../storm/mqtt/trident/MqttPublishFunction.java | 84 +++++
.../storm/mqtt/StormMqttIntegrationTest.java | 132 ++++++++
external/storm-mqtt/examples/pom.xml | 116 +++++++
.../examples/src/main/flux/sample.yaml | 64 ++++
.../examples/src/main/flux/ssl-sample.yaml | 78 +++++
.../mqtt/examples/CustomMessageMapper.java | 48 +++
.../mqtt/examples/MqttBrokerPublisher.java | 102 ++++++
.../examples/src/main/resources/log4j2.xml | 32 ++
external/storm-mqtt/pom.xml | 42 +++
pom.xml | 1 +
26 files changed, 2235 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/pom.xml b/external/storm-mqtt/core/pom.xml
new file mode 100644
index 0000000..3859e7e
--- /dev/null
+++ b/external/storm-mqtt/core/pom.xml
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>storm-mqtt</artifactId>
+ <packaging>jar</packaging>
+
+ <name>storm-mqtt</name>
+
+ <parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-mqtt-parent</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>bintray</id>
+ <url>http://dl.bintray.com/andsel/maven/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>5.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-mqtt</artifactId>
+ <version>5.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <version>5.9.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>1.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>perTest</forkMode>
+ <enableAssertions>false</enableAssertions>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <excludedGroups>${java.unit.test.exclude}</excludedGroups>
+ <includes>
+ <include>${java.unit.test.include}</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>${maven-surefire.version}</version>
+ <configuration>
+ <forkMode>perTest</forkMode>
+ <enableAssertions>false</enableAssertions>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <includes>
+ <include>${java.integration.test.include}</include>
+ </includes>
+ <groups>${java.integration.test.group}</groups> <!--set in integration-test the profile-->
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttLogger.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttLogger.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttLogger.java
new file mode 100644
index 0000000..f382a02
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttLogger.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around SLF4J logger that allows MQTT messages to be logged.
+ */
+public class MqttLogger extends Tracer {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttLogger.class);
+
+ @Override
+ public void debug(String message, Object... args) {
+ LOG.debug(String.format(message, args));
+ }
+
+ @Override
+ public void onSend(MQTTFrame frame) {
+ super.onSend(frame);
+ }
+
+ @Override
+ public void onReceive(MQTTFrame frame) {
+ super.onReceive(frame);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessage.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessage.java
new file mode 100644
index 0000000..5436dda
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessage.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+/**
+ * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat")
+ * and a byte array message/payload.
+ *
+ */
+public class MqttMessage {
+ private String topic;
+ private byte[] message;
+
+
+ public MqttMessage(String topic, byte[] payload){
+ this.topic = topic;
+ this.message = payload;
+ }
+ public byte[] getMessage(){
+ return this.message;
+ }
+
+ public String getTopic(){
+ return this.topic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
new file mode 100644
index 0000000..316e83b
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.io.Serializable;
+
+/**
+ * Represents an object that can be converted to a Storm Tuple from an AckableMessage,
+ * given a MQTT Topic Name and a byte array payload.
+ */
+public interface MqttMessageMapper extends Serializable {
+ /**
+ * Convert a AckableMessage to a set of Values that can be emitted as a Storm Tuple.
+ *
+ * @param message An MQTT Message.
+ * @return Values representing a Storm Tuple.
+ */
+ Values toValues(MqttMessage message);
+
+ /**
+ * Returns the list of output fields this Mapper produces.
+ *
+ * @return the list of output fields this mapper produces.
+ */
+ Fields outputFields();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
new file mode 100644
index 0000000..f736d55
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * Given a Tuple, converts it to an MQTT message.
+ */
+public interface MqttTupleMapper extends Serializable{
+
+ /**
+ * Converts a Tuple to a MqttMessage
+ * @param tuple the incoming tuple
+ * @return the message to publish
+ */
+ MqttMessage toMessage(ITuple tuple);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
new file mode 100644
index 0000000..9a2ba3d
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.MqttTupleMapper;
+import org.apache.storm.mqtt.common.MqttPublisher;
+import org.apache.storm.mqtt.common.SslUtils;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+public class MqttBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class);
+ private MqttTupleMapper mapper;
+ private transient MqttPublisher publisher;
+ private boolean retain = false;
+ private transient OutputCollector collector;
+ private MqttOptions options;
+ private KeyStoreLoader keyStoreLoader;
+ private transient String topologyName;
+
+
+ public MqttBolt(MqttOptions options, MqttTupleMapper mapper){
+ this(options, mapper, null, false);
+ }
+
+ public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean retain){
+ this(options, mapper, null, retain);
+ }
+
+ public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader){
+ this(options, mapper, keyStoreLoader, false);
+ }
+
+ public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){
+ this.options = options;
+ this.mapper = mapper;
+ this.retain = retain;
+ this.keyStoreLoader = keyStoreLoader;
+ // the following code is duplicated in the constructor of MqttPublisher
+ // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology
+ // is deployed to the cluster
+ SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+ this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain);
+ try {
+ this.publisher.connectMqtt(this.topologyName + "-" + context.getThisComponentId() + "-" + context.getThisTaskId());
+ } catch (Exception e) {
+ LOG.error("Unable to connect to MQTT Broker.", e);
+ throw new RuntimeException("Unable to connect to MQTT Broker.", e);
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ //ignore tick tuples
+ if(!TupleUtils.isTick(input)){
+ MqttMessage message = this.mapper.toMessage(input);
+ try {
+ this.publisher.publish(message, this.retain);
+ this.collector.ack(input);
+ } catch (Exception e) {
+ LOG.warn("Error publishing MQTT message. Failing tuple.", e);
+ // should we fail the tuple or kill the worker?
+ collector.fail(input);
+ }
+ } else {
+ collector.ack(input);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // this bolt does not emit tuples
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
new file mode 100644
index 0000000..7bd4bb0
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * MQTT Configuration Options
+ */
+public class MqttOptions implements Serializable {
+ private String url = "tcp://localhost:1883";
+ private List<String> topics = null;
+ private boolean cleanConnection = false;
+
+ private String willTopic;
+ private String willPayload;
+ private int willQos = 1;
+ private boolean willRetain = false;
+
+ private long reconnectDelay = 10;
+ private long reconnectDelayMax = 30*1000;
+ private double reconnectBackOffMultiplier = 2.0f;
+ private long reconnectAttemptsMax = -1;
+ private long connectAttemptsMax = -1;
+
+ private String userName = "";
+ private String password = "";
+
+ private int qos = 1;
+
+ public String getUrl() {
+ return url;
+ }
+
+ /**
+ * Sets the url for connecting to the MQTT broker.
+ *
+ * Default: `tcp://localhost:1883'
+ * @param url
+ */
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ /**
+ * A list of MQTT topics to subscribe to.
+ *
+ * @param topics
+ */
+ public void setTopics(List<String> topics) {
+ this.topics = topics;
+ }
+
+ public boolean isCleanConnection() {
+ return cleanConnection;
+ }
+
+ /**
+ * Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions.
+ * Defaults to true.
+ *
+ * @param cleanConnection
+ */
+ public void setCleanConnection(boolean cleanConnection) {
+ this.cleanConnection = cleanConnection;
+ }
+
+ public String getWillTopic() {
+ return willTopic;
+ }
+
+ /**
+ * If set the server will publish the client's Will message to the specified topics if the client has an unexpected
+ * disconnection.
+ *
+ * @param willTopic
+ */
+ public void setWillTopic(String willTopic) {
+ this.willTopic = willTopic;
+ }
+
+ public String getWillPayload() {
+ return willPayload;
+ }
+
+ /**
+ * The Will message to send. Defaults to a zero length message.
+ *
+ * @param willPayload
+ */
+ public void setWillPayload(String willPayload) {
+ this.willPayload = willPayload;
+ }
+
+ public long getReconnectDelay() {
+ return reconnectDelay;
+ }
+
+ /**
+ * How long to wait in ms before the first reconnect attempt. Defaults to 10.
+ *
+ * @param reconnectDelay
+ */
+ public void setReconnectDelay(long reconnectDelay) {
+ this.reconnectDelay = reconnectDelay;
+ }
+
+ public long getReconnectDelayMax() {
+ return reconnectDelayMax;
+ }
+
+ /**
+ * The maximum amount of time in ms to wait between reconnect attempts. Defaults to 30,000.
+ *
+ * @param reconnectDelayMax
+ */
+ public void setReconnectDelayMax(long reconnectDelayMax) {
+ this.reconnectDelayMax = reconnectDelayMax;
+ }
+
+ public double getReconnectBackOffMultiplier() {
+ return reconnectBackOffMultiplier;
+ }
+
+ /**
+ * The Exponential backoff be used between reconnect attempts. Set to 1 to disable exponential backoff. Defaults to
+ * 2.
+ *
+ * @param reconnectBackOffMultiplier
+ */
+ public void setReconnectBackOffMultiplier(double reconnectBackOffMultiplier) {
+ this.reconnectBackOffMultiplier = reconnectBackOffMultiplier;
+ }
+
+ public long getReconnectAttemptsMax() {
+ return reconnectAttemptsMax;
+ }
+
+ /**
+ * The maximum number of reconnect attempts before an error is reported back to the client after a server
+ * connection had previously been established. Set to -1 to use unlimited attempts. Defaults to -1.
+ *
+ * @param reconnectAttemptsMax
+ */
+ public void setReconnectAttemptsMax(long reconnectAttemptsMax) {
+ this.reconnectAttemptsMax = reconnectAttemptsMax;
+ }
+
+ public long getConnectAttemptsMax() {
+ return connectAttemptsMax;
+ }
+
+ /**
+ * The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by
+ * the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to -1.
+ *
+ * @param connectAttemptsMax
+ */
+ public void setConnectAttemptsMax(long connectAttemptsMax) {
+ this.connectAttemptsMax = connectAttemptsMax;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * The username for authenticated sessions.
+ *
+ * @param userName
+ */
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * The password for authenticated sessions.
+ * @param password
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public int getQos(){
+ return this.qos;
+ }
+
+ /**
+ * Sets the quality of service to use for MQTT messages. Defaults to 1 (at least once).
+ * @param qos
+ */
+ public void setQos(int qos){
+ if(qos < 0 || qos > 2){
+ throw new IllegalArgumentException("MQTT QoS must be >= 0 and <= 2");
+ }
+ this.qos = qos;
+ }
+
+ public int getWillQos(){
+ return this.willQos;
+ }
+
+ /**
+ * Sets the quality of service to use for the MQTT Will message. Defaults to 1 (at least once).
+ *
+ * @param qos
+ */
+ public void setWillQos(int qos){
+ if(qos < 0 || qos > 2){
+ throw new IllegalArgumentException("MQTT Will QoS must be >= 0 and <= 2");
+ }
+ this.willQos = qos;
+ }
+
+ public boolean getWillRetain(){
+ return this.willRetain;
+ }
+
+ /**
+ * Set to true if you want the Will message to be published with the retain option.
+ * @param retain
+ */
+ public void setWillRetain(boolean retain){
+ this.willRetain = retain;
+ }
+
+ public static class Builder {
+ private MqttOptions options = new MqttOptions();
+
+ public Builder url(String url) {
+ this.options.url = url;
+ return this;
+ }
+
+
+ public Builder topics(List<String> topics) {
+ this.options.topics = topics;
+ return this;
+ }
+
+ public Builder cleanConnection(boolean cleanConnection) {
+ this.options.cleanConnection = cleanConnection;
+ return this;
+ }
+
+ public Builder willTopic(String willTopic) {
+ this.options.willTopic = willTopic;
+ return this;
+ }
+
+ public Builder willPayload(String willPayload) {
+ this.options.willPayload = willPayload;
+ return this;
+ }
+
+ public Builder willRetain(boolean retain){
+ this.options.willRetain = retain;
+ return this;
+ }
+
+ public Builder willQos(int qos){
+ this.options.setWillQos(qos);
+ return this;
+ }
+
+ public Builder reconnectDelay(long reconnectDelay) {
+ this.options.reconnectDelay = reconnectDelay;
+ return this;
+ }
+
+ public Builder reconnectDelayMax(long reconnectDelayMax) {
+ this.options.reconnectDelayMax = reconnectDelayMax;
+ return this;
+ }
+
+ public Builder reconnectBackOffMultiplier(double reconnectBackOffMultiplier) {
+ this.options.reconnectBackOffMultiplier = reconnectBackOffMultiplier;
+ return this;
+ }
+
+ public Builder reconnectAttemptsMax(long reconnectAttemptsMax) {
+ this.options.reconnectAttemptsMax = reconnectAttemptsMax;
+ return this;
+ }
+
+ public Builder connectAttemptsMax(long connectAttemptsMax) {
+ this.options.connectAttemptsMax = connectAttemptsMax;
+ return this;
+ }
+
+ public Builder userName(String userName) {
+ this.options.userName = userName;
+ return this;
+ }
+
+ public Builder password(String password) {
+ this.options.password = password;
+ return this;
+ }
+
+ public Builder qos(int qos){
+ this.options.setQos(qos);
+ return this;
+ }
+
+ public MqttOptions build() {
+ return this.options;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
new file mode 100644
index 0000000..176a208
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+
+import org.apache.storm.mqtt.MqttLogger;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class MqttPublisher {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class);
+
+ private MqttOptions options;
+ private transient BlockingConnection connection;
+ private KeyStoreLoader keyStoreLoader;
+ private QoS qos;
+ private boolean retain = false;
+
+
+ public MqttPublisher(MqttOptions options){
+ this(options, null, false);
+ }
+
+ public MqttPublisher(MqttOptions options, boolean retain){
+ this(options, null, retain);
+ }
+
+ public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, boolean retain){
+ this.retain = retain;
+ this.options = options;
+ this.keyStoreLoader = keyStoreLoader;
+ SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
+ this.qos = MqttUtils.qosFromInt(this.options.getQos());
+ }
+
+ public void publish(MqttMessage message, boolean retain) throws Exception {
+ this.connection.publish(message.getTopic(), message.getMessage(), this.qos, this.retain);
+ }
+
+
+
+ public void connectMqtt(String clientId) throws Exception {
+ MQTT client = new MQTT();
+ URI uri = URI.create(this.options.getUrl());
+
+ client.setHost(uri);
+ if(!uri.getScheme().toLowerCase().equals("tcp")){
+ client.setSslContext(SslUtils.sslContext(uri.getScheme(), this.keyStoreLoader));
+ }
+ client.setClientId(clientId);
+ LOG.info("MQTT ClientID: " + client.getClientId().toString());
+ client.setCleanSession(this.options.isCleanConnection());
+
+ client.setReconnectDelay(this.options.getReconnectDelay());
+ client.setReconnectDelayMax(this.options.getReconnectDelayMax());
+ client.setReconnectBackOffMultiplier(this.options.getReconnectBackOffMultiplier());
+ client.setConnectAttemptsMax(this.options.getConnectAttemptsMax());
+ client.setReconnectAttemptsMax(this.options.getReconnectAttemptsMax());
+
+
+ client.setUserName(this.options.getUserName());
+ client.setPassword(this.options.getPassword());
+ client.setTracer(new MqttLogger());
+
+ if(this.options.getWillTopic() != null && this.options.getWillPayload() != null){
+ QoS qos = MqttUtils.qosFromInt(this.options.getWillQos());
+ client.setWillQos(qos);
+ client.setWillTopic(this.options.getWillTopic());
+ client.setWillMessage(this.options.getWillPayload());
+ client.setWillRetain(this.options.getWillRetain());
+ }
+
+ this.connection = client.blockingConnection();
+ this.connection.connect();
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
new file mode 100644
index 0000000..141151a
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+
+import org.fusesource.mqtt.client.QoS;
+
+public class MqttUtils {
+
+ private MqttUtils(){}
+
+ public static QoS qosFromInt(int i){
+ QoS qos = null;
+ switch(i) {
+ case 0:
+ qos = QoS.AT_MOST_ONCE;
+ break;
+ case 1:
+ qos = QoS.AT_LEAST_ONCE;
+ break;
+ case 2:
+ qos = QoS.EXACTLY_ONCE;
+ break;
+ default:
+ throw new IllegalArgumentException(i + "is not a valid MQTT QoS.");
+ }
+ return qos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/SslUtils.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
new file mode 100644
index 0000000..681fc1d
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import java.net.URI;
+import java.security.KeyStore;
+
+public class SslUtils {
+ private SslUtils(){}
+
+ public static void checkSslConfig(String url, KeyStoreLoader loader){
+ URI uri = URI.create(url);
+ String scheme = uri.getScheme().toLowerCase();
+ if(!(scheme.equals("tcp") || scheme.startsWith("tls") || scheme.startsWith("ssl"))){
+ throw new IllegalArgumentException("Unrecognized URI scheme: " + scheme);
+ }
+ if(!scheme.equalsIgnoreCase("tcp") && loader == null){
+ throw new IllegalStateException("A TLS/SSL MQTT URL was specified, but no KeyStoreLoader configured. " +
+ "A KeyStoreLoader implementation is required when using TLS/SSL.");
+ }
+ }
+
+ public static SSLContext sslContext(String scheme, KeyStoreLoader keyStoreLoader) throws Exception {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(keyStoreLoader.keyStoreInputStream(), keyStoreLoader.keyStorePassword().toCharArray());
+
+ KeyStore ts = KeyStore.getInstance("JKS");
+ ts.load(keyStoreLoader.trustStoreInputStream(), keyStoreLoader.trustStorePassword().toCharArray());
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(ks, keyStoreLoader.keyPassword().toCharArray());
+
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(ts);
+
+ SSLContext sc = SSLContext.getInstance(scheme.toUpperCase());
+ TrustManager[] trustManagers = tmf.getTrustManagers();
+ sc.init(kmf.getKeyManagers(), trustManagers, null);
+
+ return sc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
new file mode 100644
index 0000000..f465177
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.mappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.MqttMessageMapper;
+
+
+public class ByteArrayMessageMapper implements MqttMessageMapper {
+ public Values toValues(MqttMessage message) {
+ return new Values(message.getTopic(), message.getMessage());
+ }
+
+ public Fields outputFields() {
+ return new Fields("topic", "message");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
new file mode 100644
index 0000000..19a7245
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.mappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.MqttMessageMapper;
+
+/**
+ * Given a String topic and byte[] message, emits a tuple with fields
+ * "topic" and "message", both of which are Strings.
+ */
+public class StringMessageMapper implements MqttMessageMapper {
+ public Values toValues(MqttMessage message) {
+ return new Values(message.getTopic(), new String(message.getMessage()));
+ }
+
+ public Fields outputFields() {
+ return new Fields("topic", "message");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
new file mode 100644
index 0000000..08348c9
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.spout;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.storm.mqtt.MqttMessage;
+
+/**
+ * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat")
+ * and a byte array message/payload.
+ *
+ */
+class AckableMessage {
+ private String topic;
+ private byte[] message;
+ private Runnable ack;
+
+ AckableMessage(String topic, byte[] message, Runnable ack){
+ this.topic = topic;
+ this.message = message;
+ this.ack = ack;
+ }
+
+ public MqttMessage getMessage(){
+ return new MqttMessage(this.topic, this.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(71, 123)
+ .append(this.topic)
+ .append(this.message)
+ .toHashCode();
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) { return false; }
+ if (obj == this) { return true; }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ AckableMessage tm = (AckableMessage)obj;
+ return new EqualsBuilder()
+ .appendSuper(super.equals(obj))
+ .append(this.topic, tm.topic)
+ .append(this.message, tm.message)
+ .isEquals();
+ }
+
+ Runnable ack(){
+ return this.ack;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
new file mode 100644
index 0000000..d1b0417
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.spout;
+
+import backtype.storm.Config;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.mqtt.MqttLogger;
+import org.apache.storm.mqtt.MqttMessageMapper;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.common.MqttUtils;
+import org.apache.storm.mqtt.common.SslUtils;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.client.Callback;
+import org.fusesource.mqtt.client.CallbackConnection;
+import org.fusesource.mqtt.client.Listener;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class MqttSpout implements IRichSpout, Listener {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class);
+
+ private String topologyName;
+
+
+ private CallbackConnection connection;
+
+ protected transient SpoutOutputCollector collector;
+ protected transient TopologyContext context;
+ protected transient LinkedBlockingQueue<AckableMessage> incoming;
+ protected transient HashMap<Long, AckableMessage> pending;
+ private transient Map conf;
+ protected MqttMessageMapper type;
+ protected MqttOptions options;
+ protected KeyStoreLoader keyStoreLoader;
+
+ private boolean mqttConnected = false;
+ private boolean mqttConnectFailed = false;
+
+
+ private Long sequence = Long.MIN_VALUE;
+
+ private Long nextId(){
+ this.sequence++;
+ if(this.sequence == Long.MAX_VALUE){
+ this.sequence = Long.MIN_VALUE;
+ }
+ return this.sequence;
+ }
+
+
+ public MqttSpout(MqttMessageMapper type, MqttOptions options){
+ this(type, options, null);
+ }
+
+ public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader){
+ this.type = type;
+ this.options = options;
+ this.keyStoreLoader = keyStoreLoader;
+ SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(this.type.outputFields());
+ }
+
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+
+ this.collector = collector;
+ this.context = context;
+ this.conf = conf;
+
+ this.incoming = new LinkedBlockingQueue<>();
+ this.pending = new HashMap<>();
+
+ try {
+ connectMqtt();
+ } catch (Exception e) {
+ this.collector.reportError(e);
+ throw new RuntimeException("MQTT Connection failed.", e);
+ }
+
+ }
+
+ private void connectMqtt() throws Exception {
+ MQTT client = new MQTT();
+ URI uri = URI.create(this.options.getUrl());
+
+ client.setHost(uri);
+ if(!uri.getScheme().toLowerCase().equals("tcp")){
+ client.setSslContext(SslUtils.sslContext(uri.getScheme(), this.keyStoreLoader));
+ }
+ client.setClientId(this.topologyName + "-" + this.context.getThisComponentId() + "-" +
+ this.context.getThisTaskId());
+ LOG.info("MQTT ClientID: " + client.getClientId().toString());
+ client.setCleanSession(this.options.isCleanConnection());
+
+ client.setReconnectDelay(this.options.getReconnectDelay());
+ client.setReconnectDelayMax(this.options.getReconnectDelayMax());
+ client.setReconnectBackOffMultiplier(this.options.getReconnectBackOffMultiplier());
+ client.setConnectAttemptsMax(this.options.getConnectAttemptsMax());
+ client.setReconnectAttemptsMax(this.options.getReconnectAttemptsMax());
+
+
+ client.setUserName(this.options.getUserName());
+ client.setPassword(this.options.getPassword());
+ client.setTracer(new MqttLogger());
+
+ if(this.options.getWillTopic() != null && this.options.getWillPayload() != null){
+ client.setWillTopic(this.options.getWillTopic());
+ client.setWillMessage(this.options.getWillPayload());
+ client.setWillRetain(this.options.getWillRetain());
+ QoS qos = MqttUtils.qosFromInt(this.options.getWillQos());
+ client.setWillQos(qos);
+ }
+
+
+ this.connection = client.callbackConnection();
+ this.connection.listener(this);
+ this.connection.connect(new ConnectCallback());
+
+ while(!this.mqttConnected && !this.mqttConnectFailed){
+ LOG.info("Waiting for connection...");
+ Thread.sleep(500);
+ }
+
+ if(this.mqttConnected){
+ List<String> topicList = this.options.getTopics();
+ Topic[] topics = new Topic[topicList.size()];
+ QoS qos = MqttUtils.qosFromInt(this.options.getQos());
+ for(int i = 0;i < topicList.size();i++){
+ topics[i] = new Topic(topicList.get(i), qos);
+ }
+ connection.subscribe(topics, new SubscribeCallback());
+ }
+ }
+
+
+
+ public void close() {
+ this.connection.disconnect(new DisconnectCallback());
+ }
+
+ public void activate() {
+ }
+
+ public void deactivate() {
+ }
+
+ /**
+ * When this method is called, Storm is requesting that the Spout emit tuples to the
+ * output collector. This method should be non-blocking, so if the Spout has no tuples
+ * to emit, this method should return. nextTuple, ack, and fail are all called in a tight
+ * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
+ * to have nextTuple sleep for a short amount of time (like a single millisecond)
+ * so as not to waste too much CPU.
+ */
+ public void nextTuple() {
+ AckableMessage tm = this.incoming.poll();
+ if(tm != null){
+ Long id = nextId();
+ this.collector.emit(this.type.toValues(tm.getMessage()), id);
+ this.pending.put(id, tm);
+ } else {
+ Thread.yield();
+ }
+
+ }
+
+ /**
+ * Storm has determined that the tuple emitted by this spout with the msgId identifier
+ * has been fully processed. Typically, an implementation of this method will take that
+ * message off the queue and prevent it from being replayed.
+ *
+ * @param msgId
+ */
+ public void ack(Object msgId) {
+ AckableMessage msg = this.pending.remove(msgId);
+ this.connection.getDispatchQueue().execute(msg.ack());
+ }
+
+ /**
+ * The tuple emitted by this spout with the msgId identifier has failed to be
+ * fully processed. Typically, an implementation of this method will put that
+ * message back on the queue to be replayed at a later time.
+ *
+ * @param msgId
+ */
+ public void fail(Object msgId) {
+ try {
+ this.incoming.put(this.pending.remove(msgId));
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while re-queueing message.", e);
+ }
+ }
+
+
+ // ################# Listener Implementation ######################
+ public void onConnected() {
+ // this gets called repeatedly for no apparent reason, don't do anything
+ }
+
+ public void onDisconnected() {
+ // this gets called repeatedly for no apparent reason, don't do anything
+ }
+
+ public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
+ LOG.debug("Received message: topic={}, payload={}", topic.toString(), new String(payload.toByteArray()));
+ try {
+ this.incoming.put(new AckableMessage(topic.toString(), payload.toByteArray(), ack));
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while queueing an MQTT message.");
+ }
+ }
+
+ public void onFailure(Throwable throwable) {
+ LOG.error("MQTT Connection Failure.", throwable);
+ MqttSpout.this.connection.disconnect(new DisconnectCallback());
+ throw new RuntimeException("MQTT Connection failure.", throwable);
+ }
+
+ // ################# Connect Callback Implementation ######################
+ private class ConnectCallback implements Callback<Void> {
+ public void onSuccess(Void v) {
+ LOG.info("MQTT Connected. Subscribing to topic...");
+ MqttSpout.this.mqttConnected = true;
+ }
+
+ public void onFailure(Throwable throwable) {
+ LOG.info("MQTT Connection failed.");
+ MqttSpout.this.mqttConnectFailed = true;
+ }
+ }
+
+ // ################# Subscribe Callback Implementation ######################
+ private class SubscribeCallback implements Callback<byte[]>{
+ public void onSuccess(byte[] qos) {
+ LOG.info("Subscripton sucessful.");
+ }
+
+ public void onFailure(Throwable throwable) {
+ LOG.error("MQTT Subscripton failed.", throwable);
+ throw new RuntimeException("MQTT Subscribe failed.", throwable);
+ }
+ }
+
+ // ################# Subscribe Callback Implementation ######################
+ private class DisconnectCallback implements Callback<Void>{
+ public void onSuccess(Void aVoid) {
+ LOG.info("MQTT Disconnect successful.");
+ }
+
+ public void onFailure(Throwable throwable) {
+ // Disconnects don't fail.
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
new file mode 100644
index 0000000..8bca407
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.ssl;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+/**
+ * KeyStoreLoader implementation that uses local files.
+ */
+public class DefaultKeyStoreLoader implements KeyStoreLoader {
+ private String ksFile = null;
+ private String tsFile = null;
+ private String keyStorePassword = "";
+ private String trustStorePassword = "";
+ private String keyPassword = "";
+
+ /**
+ * Creates a DefaultKeystoreLoader that uses the same file
+ * for both the keystore and truststore.
+ *
+ * @param keystore path to keystore file
+ */
+ public DefaultKeyStoreLoader(String keystore){
+ this.ksFile = keystore;
+ }
+
+ /**
+ * Creates a DefaultKeystoreLoader that uses separate files
+ * for the keystore and truststore.
+ *
+ * @param keystore path to keystore file
+ * @param truststore path to truststore file
+ */
+ public DefaultKeyStoreLoader(String keystore, String truststore){
+ this.ksFile = keystore;
+ this.tsFile = truststore;
+ }
+
+ public void setKeyStorePassword(String keyStorePassword) {
+ this.keyStorePassword = keyStorePassword;
+ }
+
+ public void setTrustStorePassword(String trustStorePassword) {
+ this.trustStorePassword = trustStorePassword;
+ }
+
+ public void setKeyPassword(String keyPassword) {
+ this.keyPassword = keyPassword;
+ }
+
+ @Override
+ public InputStream keyStoreInputStream() throws FileNotFoundException {
+ return new FileInputStream(this.ksFile);
+ }
+
+ @Override
+ public InputStream trustStoreInputStream() throws FileNotFoundException {
+ // if no truststore file, assume the truststore is the keystore.
+ if(this.tsFile == null){
+ return new FileInputStream(this.ksFile);
+ } else {
+ return new FileInputStream(this.tsFile);
+ }
+ }
+
+ @Override
+ public String keyStorePassword() {
+ return this.keyStorePassword;
+ }
+
+ @Override
+ public String trustStorePassword() {
+ return this.trustStorePassword;
+ }
+
+ @Override
+ public String keyPassword() {
+ return this.keyPassword;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
new file mode 100644
index 0000000..297efcc
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * Abstraction for loading keystore/truststore data. This allows keystores
+ * to be loaded from different sources (File system, HDFS, etc.).
+ */
+public interface KeyStoreLoader extends Serializable {
+
+ String keyStorePassword();
+ String trustStorePassword();
+ String keyPassword();
+ InputStream keyStoreInputStream() throws IOException;
+ InputStream trustStoreInputStream() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
new file mode 100644
index 0000000..369d342
--- /dev/null
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.trident;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.FailedException;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.MqttTupleMapper;
+import org.apache.storm.mqtt.common.MqttPublisher;
+import org.apache.storm.mqtt.common.SslUtils;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.operation.TridentOperationContext;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.Map;
+
+public class MqttPublishFunction extends BaseFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttPublishFunction.class);
+ private MqttTupleMapper mapper;
+ private transient MqttPublisher publisher;
+ private boolean retain = false;
+ private transient OutputCollector collector;
+ private MqttOptions options;
+ private KeyStoreLoader keyStoreLoader;
+ private transient String topologyName;
+
+
+ public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){
+ this.options = options;
+ this.mapper = mapper;
+ this.retain = retain;
+ this.keyStoreLoader = keyStoreLoader;
+ // the following code is duplicated in the constructor of MqttPublisher
+ // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology
+ // is deployed to the cluster
+ SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
+ }
+
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+ this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+ this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain);
+ try {
+ this.publisher.connectMqtt(this.topologyName + "-" + context.getPartitionIndex());
+ } catch (Exception e) {
+ LOG.error("Unable to connect to MQTT Broker.", e);
+ throw new RuntimeException("Unable to connect to MQTT Broker.", e);
+ }
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ MqttMessage message = this.mapper.toMessage(tuple);
+ try {
+ this.publisher.publish(message, this.retain);
+ } catch (Exception e) {
+ LOG.warn("Error publishing MQTT message. Failing tuple.", e);
+ // should we fail the batch or kill the worker?
+ throw new FailedException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
new file mode 100644
index 0000000..0b4f652
--- /dev/null
+++ b/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.testing.IntegrationTest;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.ITuple;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.storm.mqtt.bolt.MqttBolt;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.common.MqttPublisher;
+import org.apache.storm.mqtt.mappers.StringMessageMapper;
+import org.apache.storm.mqtt.spout.MqttSpout;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Arrays;
+
+@Category(IntegrationTest.class)
+public class StormMqttIntegrationTest implements Serializable{
+ private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class);
+ private static BrokerService broker;
+
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ broker.stop();
+ }
+
+ @BeforeClass
+ public static void start() throws Exception {
+ LOG.warn("Starting broker...");
+ broker = new BrokerService();
+ broker.addConnector("mqtt://localhost:1883");
+ broker.setDataDirectory("target");
+ broker.start();
+ LOG.debug("MQTT broker started");
+ }
+
+
+ @Test
+ public void testMqttTopology() throws Exception {
+ MQTT client = new MQTT();
+ client.setTracer(new MqttLogger());
+ URI uri = URI.create("tcp://localhost:1883");
+ client.setHost(uri);
+
+ client.setClientId("MQTTSubscriber");
+ client.setCleanSession(false);
+ BlockingConnection connection = client.blockingConnection();
+ connection.connect();
+ Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)};
+ byte[] qoses = connection.subscribe(topics);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", new Config(), buildMqttTopology());
+
+ System.out.println("topology started");
+ Thread.sleep(10000);
+
+ // publish a retained message to the broker
+ MqttOptions options = new MqttOptions();
+ options.setCleanConnection(false);
+ MqttPublisher publisher = new MqttPublisher(options);
+ publisher.connectMqtt("MqttPublisher");
+ publisher.publish(new MqttMessage("/mqtt-topology", "test".getBytes()), true);
+
+ LOG.info("published message");
+
+ Message message = connection.receive();
+ LOG.info("Message recieved on topic: " + message.getTopic());
+ LOG.info("Payload: " + new String(message.getPayload()));
+ message.ack();
+
+ Assert.assertArrayEquals(message.getPayload(), "hello mqtt".getBytes());
+ Assert.assertEquals(message.getTopic(), "/integration-result");
+ cluster.shutdown();
+ }
+
+ public StormTopology buildMqttTopology(){
+ TopologyBuilder builder = new TopologyBuilder();
+
+ MqttOptions options = new MqttOptions();
+ options.setTopics(Arrays.asList("/mqtt-topology"));
+ options.setCleanConnection(false);
+ MqttSpout spout = new MqttSpout(new StringMessageMapper(), options);
+
+ MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() {
+ @Override
+ public MqttMessage toMessage(ITuple tuple) {
+ LOG.info("Received: " + tuple);
+ return new MqttMessage("/integration-result", "hello mqtt".getBytes());
+ }
+ });
+
+ builder.setSpout("mqtt-spout", spout);
+ builder.setBolt("mqtt-bolt", bolt).shuffleGrouping("mqtt-spout");
+
+ return builder.createTopology();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/examples/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/examples/pom.xml b/external/storm-mqtt/examples/pom.xml
new file mode 100644
index 0000000..7576a54
--- /dev/null
+++ b/external/storm-mqtt/examples/pom.xml
@@ -0,0 +1,116 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>storm-mqtt-examples</artifactId>
+ <packaging>jar</packaging>
+
+ <name>storm-mqtt-examples</name>
+
+ <parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-mqtt-parent</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-mqtt</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <!--<scope>provided</scope>-->
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>1.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>5.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-mqtt</artifactId>
+ <version>5.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <version>5.9.0</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.storm.flux.Flux</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/examples/src/main/flux/sample.yaml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/examples/src/main/flux/sample.yaml b/external/storm-mqtt/examples/src/main/flux/sample.yaml
new file mode 100644
index 0000000..5748eaa
--- /dev/null
+++ b/external/storm-mqtt/examples/src/main/flux/sample.yaml
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "mqtt-topology"
+
+components:
+ ########## MQTT Spout Config ############
+ - id: "mqtt-type"
+ className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+ - id: "mqtt-options"
+ className: "org.apache.storm.mqtt.common.MqttOptions"
+ properties:
+ - name: "url"
+ value: "tcp://localhost:1883"
+ - name: "topics"
+ value:
+ - "/users/tgoetz/#"
+
+# topology configuration
+config:
+ topology.workers: 1
+ topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+ - id: "mqtt-spout"
+ className: "org.apache.storm.mqtt.spout.MqttSpout"
+ constructorArgs:
+ - ref: "mqtt-type"
+ - ref: "mqtt-options"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+
+streams:
+
+ - from: "mqtt-spout"
+ to: "log"
+ grouping:
+ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/examples/src/main/flux/ssl-sample.yaml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/examples/src/main/flux/ssl-sample.yaml b/external/storm-mqtt/examples/src/main/flux/ssl-sample.yaml
new file mode 100644
index 0000000..bfb668d
--- /dev/null
+++ b/external/storm-mqtt/examples/src/main/flux/ssl-sample.yaml
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "mqtt-topology"
+
+components:
+ ########## MQTT Spout Config ############
+ - id: "mqtt-type"
+ className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+ - id: "keystore-loader"
+ className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
+ constructorArgs:
+ - "keystore.jks"
+ - "truststore.jks"
+ properties:
+ - name: "keyPassword"
+ value: "password"
+ - name: "keyStorePassword"
+ value: "password"
+ - name: "trustStorePassword"
+ value: "password"
+
+ - id: "mqtt-options"
+ className: "org.apache.storm.mqtt.common.MqttOptions"
+ properties:
+ - name: "url"
+ value: "ssl://raspberrypi.local:8883"
+ - name: "topics"
+ value:
+ - "/users/tgoetz/#"
+
+# topology configuration
+config:
+ topology.workers: 1
+ topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+ - id: "mqtt-spout"
+ className: "org.apache.storm.mqtt.spout.MqttSpout"
+ constructorArgs:
+ - ref: "mqtt-type"
+ - ref: "mqtt-options"
+ - ref: "keystore-loader"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+
+streams:
+
+ - from: "mqtt-spout"
+ to: "log"
+ grouping:
+ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/f157ab42/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java b/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
new file mode 100644
index 0000000..63ec6fe
--- /dev/null
+++ b/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.examples;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.MqttMessageMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Given a topic name: "/users/{user}/{location}/{deviceId}"
+ * and a payload of "{temperature}/{humidity}"
+ * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
+ *
+ */
+public class CustomMessageMapper implements MqttMessageMapper {
+ private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class);
+
+
+ public Values toValues(MqttMessage message) {
+ String topic = message.getTopic();
+ String[] topicElements = topic.split("/");
+ String[] payloadElements = new String(message.getMessage()).split("/");
+
+ return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]), Float.parseFloat(payloadElements[1]));
+ }
+
+ public Fields outputFields() {
+ return new Fields("user", "deviceId", "location", "temperature", "humidity");
+ }
+}