You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/20 23:06:20 UTC

[07/18] storm git commit: update documentation

update documentation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/526a43b8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/526a43b8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/526a43b8

Branch: refs/heads/1.x-branch
Commit: 526a43b88c7fee8501c3c0d29285f24cb6bbe303
Parents: 4b61308
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Jan 8 14:18:59 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jan 8 14:18:59 2016 -0500

----------------------------------------------------------------------
 external/storm-mqtt/README.md                   | 204 ++++++++++++++++++-
 .../apache/storm/mqtt/MqttMessageMapper.java    |   2 +-
 .../mqtt/examples/CustomMessageMapper.java      |   5 +-
 3 files changed, 202 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/526a43b8/external/storm-mqtt/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/README.md b/external/storm-mqtt/README.md
index a67aa3c..b42f627 100644
--- a/external/storm-mqtt/README.md
+++ b/external/storm-mqtt/README.md
@@ -4,7 +4,8 @@
 
 MQTT is a lightweight publish/subscribe protocol frequently used in IoT applications.
 
-Further information can be found at http://mqtt.org
+Further information can be found at http://mqtt.org. The HiveMQ website has a great series on 
+[MQTT Essentials](http://www.hivemq.com/mqtt-essentials/).
 
 Features include:
 
@@ -18,9 +19,10 @@ Features include:
 
 
 ## Quick Start
-To simply see MQTT integration in action, follow the instructions below.
+To quickly see MQTT integration in action, follow the instructions below.
 
 **Start a MQTT broker and publisher**
+
 The command below will create an MQTT broker on port 1883, and start a publsher that will publish random 
 temperature/humidity values to an MQTT topic.
 
@@ -31,6 +33,7 @@ java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.mqt
 ```
 
 **Run the example toplogy**
+
 Run the sample topology using Flux. This will start a local mode cluster and topology that consists of the MQTT Spout
 publishing to a bolt that simply logs the information it receives.
 
@@ -49,12 +52,12 @@ You should see data from MQTT being logged by the bolt:
 27049 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0}
 27059 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0}
 27069 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0}
-2
 ```
 
 Either allow the local cluster to exit, or stop it by typing Cntrl-C.
 
 **MQTT Fault Tolerance In Action**
+
 After the toplogy has been shutdown, the MQTT subscription created by the MQTT spout will persist with the broker,
 and it will continue to receive and queue messages (as long as the broker is running).
 
@@ -64,13 +67,12 @@ rate of about two messages per second.
 
 This happens because, by default, the MQTT Spout creates a *session* when it subscribes -- that means it requests that
 the broker hold onto and redeliver any messages it missed while offline. Another important factor is the the 
-`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at most once delivery*.
+`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at least once delivery*.
 
 For more information about MQTT fault tolerance, see the **Delivery Guarantees** section below.
 
 
 
-
 ## Delivery Guarantees
 In Storm terms, ***the MQTT Spout provides at least once delivery***, depending on the configuration of the publisher as
 well as the MQTT spout.
@@ -90,7 +92,7 @@ So resiliancy is ultimately dependent on the underlying MQTT implementation and
 ###Recommendations
 
 *You will never get at exactly once processing with this spout. It can be used with Trident, but it won't provide 
-transational semantics.*
+transational semantics. You will only get at least once guarantees.*
 
 If you need reliability guarantees (i.e. *at least once processing*):
 
@@ -108,6 +110,90 @@ man-made disasters and network partitions. Incineration and destruction happens.
 ## Configuration
 For the full range of configuration options, see the JavaDoc for `org.apache.storm.mqtt.common.MqttOptions`.
 
+### Message Mappers
+To define how MQTT messages are mapped to Storm tuples, you configure the MQTT spout with an implementation of the 
+`org.apache.storm.mqtt.MqttMessageMapper` interface, which looks like this:
+
+```java
+public interface MqttMessageMapper extends Serializable {
+
+    Values toValues(MqttMessage message);
+
+    Fields outputFields();
+}
+```
+
+The `MqttMessage` class contains the topic to which the message was published (`String`) and the message payload 
+(`byte[]`). For example, here is a `MqttMessageMapper` implementation that produces tuples based on the content of both
+the message topic and payload:
+
+```java
+/**
+ * Given a topic name: "users/{user}/{location}/{deviceId}"
+ * and a payload of "{temperature}/{humidity}"
+ * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
+ *
+ */
+public class CustomMessageMapper implements MqttMessageMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class);
+
+
+    public Values toValues(MqttMessage message) {
+        String topic = message.getTopic();
+        String[] topicElements = topic.split("/");
+        String[] payloadElements = new String(message.getMessage()).split("/");
+
+        return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]), 
+                Float.parseFloat(payloadElements[1]));
+    }
+
+    public Fields outputFields() {
+        return new Fields("user", "deviceId", "location", "temperature", "humidity");
+    }
+}
+```
+
+### Tuple Mappers
+When publishing MQTT messages with the MQTT bolt or Trident function, you need to map Storm tuple data to MQTT messages 
+(topic/payload). This is done by implementing the `org.apache.storm.mqtt.MqttTupleMapper` interface:
+
+```java
+public interface MqttTupleMapper extends Serializable{
+
+    MqttMessage toMessage(ITuple tuple);
+
+}
+```
+
+For example, a simple `MqttTupleMapper` implementation might look like this:
+
+```java
+public class MyTupleMapper implements MqttTupleMapper {
+    public MqttMessage toMessage(ITuple tuple) {
+        String topic = "users/" + tuple.getStringByField("userId") + "/" + tuple.getStringByField("device");
+        byte[] payload = tuple.getStringByField("message").getBytes();
+        return new MqttMessage(topic, payload);
+    }
+}
+```
+
+### MQTT Spout Parallelism
+It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise you will end up with multiple instances
+of the spout subscribed to the same topic(s), resulting in duplicate messages.
+
+If you want to parallelize the spout, it's recommended that you use multiple instances of the spout in your topolgoy 
+and use MQTT topic selectors to parition the data. How you implement the partitioning strategy is ultimately determined 
+by your MQTT topic structure. As an example, if you had topics partitioned by region (e.g. east/west) you could do 
+something like the following:
+
+```java
+String spout1Topic = "users/east/#";
+String spout2Topic = "users/west/#";
+```
+
+and then join the resulting streams together by subscribing a bolt to each stream.
+
+
 ### Using Flux
 
 The following Flux YAML configuration creates the toplolgy used in the example:
@@ -176,4 +262,110 @@ builder.setSpout("mqtt-spout", spout);
 builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout");
 
 return builder.createTopology();
+```
+
+## SSL/TLS
+If the MQTT broker you are connecting to requires SSL or SSL client authentication, you need to configure the spout 
+with an appropriate URI, and the location of keystore/truststore files containing the necessary certificates.
+
+### SSL/TLS URIs
+To connect over SSL/TLS use a URI with a prefix of `ssl://` or `tls://` instead of `tcp://`. For further control over
+the algorithm, you can specify a specific protocol:
+
+ * `ssl://` Use the JVM default version of the SSL protocol.
+ * `sslv*://` Use a specific version of the SSL protocol, where `*` is replaced by the version (e.g. `sslv3://`)
+ * `tls://` Use the JVM default version of the TLS protocol.
+ * `tlsv*://` Use a specific version of the TLS protocol, where `*` is replaced by the version (e.g. `tlsv1.1://`)
+ 
+ 
+### Specifying Keystore/Truststore Locations
+ 
+ The `MqttSpout`, `MqttBolt` and `MqttPublishFunction` all have constructors that take a `KeyStoreLoader` instance that
+ is used to load the certificates required for TLS/SSL connections. For example:
+ 
+```java
+ public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader)
+```
+ 
+The `DefaultKeyStoreLoader` class can be used to load certificates from the local filesystem. Note that the 
+keystore/truststore need to be available on all worker nodes where the spout/bolt might be executed. To use 
+`DefaultKeyStoreLoader` you specify the location of the keystore/truststore file(s), and set the necessary passwords:
+
+```java
+DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", "/path/to/truststore.jks");
+ksl.setKeyStorePassword("password");
+ksl.setTrustStorePassword("password");
+//...
+```
+
+If your keystore/truststore certificates are stored in a single file, you can use the one-argument constructor:
+
+```java
+DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks");
+ksl.setKeyStorePassword("password");
+//...
+```
+
+SSL/TLS can also be configured using Flux:
+
+```yaml
+name: "mqtt-topology"
+
+components:
+   ########## MQTT Spout Config ############
+  - id: "mqtt-type"
+    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+  - id: "keystore-loader"
+    className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
+    constructorArgs:
+      - "keystore.jks"
+      - "truststore.jks"
+    properties:
+      - name: "keyPassword"
+        value: "password"
+      - name: "keyStorePassword"
+        value: "password"
+      - name: "trustStorePassword"
+        value: "password"
+
+  - id: "mqtt-options"
+    className: "org.apache.storm.mqtt.common.MqttOptions"
+    properties:
+      - name: "url"
+        value: "ssl://raspberrypi.local:8883"
+      - name: "topics"
+        value:
+          - "/users/tgoetz/#"
+
+# topology configuration
+config:
+  topology.workers: 1
+  topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+  - id: "mqtt-spout"
+    className: "org.apache.storm.mqtt.spout.MqttSpout"
+    constructorArgs:
+      - ref: "mqtt-type"
+      - ref: "mqtt-options"
+      - ref: "keystore-loader"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+
+streams:
+
+  - from: "mqtt-spout"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+
 ```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/526a43b8/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
index 316e83b..3004cd4 100644
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
  */
 public interface MqttMessageMapper extends Serializable {
     /**
-     * Convert a AckableMessage to a set of Values that can be emitted as a Storm Tuple.
+     * Convert a `MqttMessage` to a set of Values that can be emitted as a Storm Tuple.
      *
      * @param message An MQTT Message.
      * @return Values representing a Storm Tuple.

http://git-wip-us.apache.org/repos/asf/storm/blob/526a43b8/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java b/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
index 63ec6fe..24632fe 100644
--- a/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
+++ b/external/storm-mqtt/examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Given a topic name: "/users/{user}/{location}/{deviceId}"
+ * Given a topic name: "users/{user}/{location}/{deviceId}"
  * and a payload of "{temperature}/{humidity}"
  * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
  *
@@ -39,7 +39,8 @@ public class CustomMessageMapper implements MqttMessageMapper {
         String[] topicElements = topic.split("/");
         String[] payloadElements = new String(message.getMessage()).split("/");
 
-        return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]), Float.parseFloat(payloadElements[1]));
+        return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]),
+                Float.parseFloat(payloadElements[1]));
     }
 
     public Fields outputFields() {