You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/08/22 19:39:50 UTC
[2/2] bahir git commit: [BAHIR-53] Add new configuration options to
MQTTInputDStream
[BAHIR-53] Add new configuration options to MQTTInputDStream
Add new configuration options to enable secured connections and
other quality of services.
Closes #23
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/28f034f4
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/28f034f4
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/28f034f4
Branch: refs/heads/master
Commit: 28f034f49d19034b596f7f04ca4fc2698a21ad6c
Parents: ab62371
Author: Sebastian Woehrl <se...@maibornwolff.de>
Authored: Sat Aug 13 15:00:13 2016 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Mon Aug 22 12:38:28 2016 -0700
----------------------------------------------------------------------
scalastyle-config.xml | 2 +-
streaming-mqtt/README.md | 23 ++++
.../spark/streaming/mqtt/MQTTInputDStream.scala | 67 ++++++++--
.../apache/spark/streaming/mqtt/MQTTUtils.scala | 129 ++++++++++++++++++-
.../streaming/mqtt/JavaMQTTStreamSuite.java | 6 +
5 files changed, 211 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 1db5977..c6aa3d9 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -94,7 +94,7 @@ This file is divided into 3 sections:
</check>
<check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
- <parameters><parameter name="maxParameters"><![CDATA[10]]></parameter></parameters>
+ <parameters><parameter name="maxParameters"><![CDATA[15]]></parameter></parameters>
</check>
<check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 9482648..2ec0128 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -25,6 +25,23 @@ The `--packages` argument can also be used with `bin/spark-submit`.
This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+## Configuration options.
+
+This source uses the [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
+
+ * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883.
+ * `storageLevel` By default it is used for storing incoming messages on disk.
+ * `topic` Topic MqttClient subscribes to.
+ * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
+ * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
+ * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
+ * `password` Sets the password to use for the connection.
+ * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.
+ * `connectionTimeout` Sets the connection timeout, a value of 0 is interpreted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information.
+ * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
+ * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
+
+
## Examples
### Scala API
@@ -34,6 +51,12 @@ this actor can be configured to handle failures, etc.
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
+Additional mqtt connection options can be provided:
+
+```Scala
+val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+```
+
### Java API
You need to extend `JavaActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index cbad6f7..328656b 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -19,10 +19,7 @@ package org.apache.spark.streaming.mqtt
import java.nio.charset.StandardCharsets
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
@@ -33,23 +30,39 @@ import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that subscribe messages from a Mqtt Broker.
* Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
*/
-
private[streaming]
class MQTTInputDStream(
_ssc: StreamingContext,
brokerUrl: String,
topic: String,
- storageLevel: StorageLevel
+ storageLevel: StorageLevel,
+ clientId: Option[String] = None,
+ username: Option[String] = None,
+ password: Option[String] = None,
+ cleanSession: Option[Boolean] = None,
+ qos: Option[Int] = None,
+ connectionTimeout: Option[Int] = None,
+ keepAliveInterval: Option[Int] = None,
+ mqttVersion: Option[Int] = None
) extends ReceiverInputDStream[String](_ssc) {
private[streaming] override def name: String = s"MQTT stream [$id]"
def getReceiver(): Receiver[String] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel)
+ new MQTTReceiver(brokerUrl, topic, storageLevel, clientId, username, password, cleanSession,
+ qos, connectionTimeout, keepAliveInterval, mqttVersion)
}
}
@@ -57,7 +70,15 @@ private[streaming]
class MQTTReceiver(
brokerUrl: String,
topic: String,
- storageLevel: StorageLevel
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]
) extends Receiver[String](storageLevel) {
def onStop() {
@@ -70,7 +91,25 @@ class MQTTReceiver(
val persistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
+ val client = new MqttClient(brokerUrl, clientId.getOrElse(MqttClient.generateClientId()),
+ persistence)
+
+ // Initialize mqtt parameters
+ val mqttConnectionOptions = new MqttConnectOptions()
+ if (username.isDefined && password.isDefined) {
+ mqttConnectionOptions.setUserName(username.get)
+ mqttConnectionOptions.setPassword(password.get.toCharArray)
+ }
+ mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true))
+ if (connectionTimeout.isDefined) {
+ mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get)
+ }
+ if (keepAliveInterval.isDefined) {
+ mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get)
+ }
+ if (mqttVersion.isDefined) {
+ mqttConnectionOptions.setMqttVersion(mqttVersion.get)
+ }
// Callback automatically triggers as and when new message arrives on specified topic
val callback = new MqttCallback() {
@@ -93,10 +132,10 @@ class MQTTReceiver(
client.setCallback(callback)
// Connect to MqttBroker
- client.connect()
+ client.connect(mqttConnectionOptions)
// Subscribe to Mqtt topic
- client.subscribe(topic)
+ client.subscribe(topic, qos.getOrElse(1))
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 7b8d56d..7e2f5c7 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -41,6 +41,40 @@ object MQTTUtils {
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]
+ ): ReceiverInputDStream[String] = {
+ new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password,
+ cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+ }
+
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
@@ -59,7 +93,7 @@ object MQTTUtils {
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
- * @param jssc JavaStreamingContext object
+ * @param jssc JavaStreamingContext object
* @param brokerUrl Url of remote MQTT publisher
* @param topic Topic name to subscribe to
* @param storageLevel RDD storage level.
@@ -73,6 +107,99 @@ object MQTTUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel,
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean,
+ qos: Int,
+ connectionTimeout: Int,
+ keepAliveInterval: Int,
+ mqttVersion: Int
+ ): JavaReceiverInputDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, brokerUrl, topic, storageLevel, Option(clientId), Option(username),
+ Option(password), Option(cleanSession), Option(qos), Option(connectionTimeout),
+ Option(keepAliveInterval), Option(mqttVersion))
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topic: String,
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean,
+ qos: Int,
+ connectionTimeout: Int,
+ keepAliveInterval: Int,
+ mqttVersion: Int
+ ): JavaReceiverInputDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
+ Option(username), Option(password), Option(cleanSession), Option(qos),
+ Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topic: String,
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean
+ ): JavaReceiverInputDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
+ Option(username), Option(password), Option(cleanSession), None, None, None, None)
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index ce5aa1e..45332d9 100644
--- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -33,5 +33,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<String> test3 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+ StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user", "password", true, 1, 10, 30, 3);
+ JavaReceiverInputDStream<String> test4 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+ "testid", "user", "password", true, 1, 10, 30, 3);
+ JavaReceiverInputDStream<String> test5 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+ "testid", "user", "password", true);
}
}