You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/07/19 23:50:03 UTC
bahir git commit: [BAHIR-100] Enhance MQTT connector to support byte
arrays
Repository: bahir
Updated Branches:
refs/heads/master dca8d4c2d -> e3d9e6960
[BAHIR-100] Enhance MQTT connector to support byte arrays
Closes #47
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e3d9e696
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e3d9e696
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e3d9e696
Branch: refs/heads/master
Commit: e3d9e6960941696ba073735e9d039c85146c217a
Parents: dca8d4c
Author: drosenst <da...@intel.com>
Authored: Wed Jul 5 23:41:02 2017 +0300
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jul 19 16:47:26 2017 -0700
----------------------------------------------------------------------
streaming-mqtt/README.md | 3 +
.../spark/streaming/mqtt/MQTTInputDStream.scala | 1 +
.../mqtt/MQTTPairedByteArrayInputDStream.scala | 144 ++++++++
.../streaming/mqtt/MQTTPairedInputDStream.scala | 1 +
.../apache/spark/streaming/mqtt/MQTTUtils.scala | 366 ++++++++++++++-----
.../streaming/mqtt/JavaMQTTStreamSuite.java | 20 +-
.../spark/streaming/mqtt/MQTTStreamSuite.scala | 26 ++
7 files changed, 477 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 6b89136..eb08b51 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -52,12 +52,14 @@ this actor can be configured to handle failures, etc.
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
+ val lines = MQTTUtils.createPairedByteArrayStreamStream(ssc, brokerUrl, topic)
Additional mqtt connection options can be provided:
```Scala
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
```
### Java API
@@ -67,5 +69,6 @@ this actor can be configured to handle failures, etc.
JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics);
+ JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics);
See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 328656b..cf27440 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that subscribe messages from a Mqtt Broker.
* Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param _ssc Spark Streaming StreamingContext
* @param brokerUrl Url of remote mqtt publisher
* @param topic topic name to subscribe to
* @param storageLevel RDD storage level.
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala
new file mode 100644
index 0000000..07c0b18
--- /dev/null
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param _ssc: Spark Streaming StreamingContext,
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topics topic name Array to subscribe to
+ * @param storageLevel RDD storage level.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+private[streaming] class MQTTPairedByteArrayInputDStream(
+ _ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String] = None,
+ username: Option[String] = None,
+ password: Option[String] = None,
+ cleanSession: Option[Boolean] = None,
+ qos: Option[Int] = None,
+ connectionTimeout: Option[Int] = None,
+ keepAliveInterval: Option[Int] = None,
+ mqttVersion: Option[Int] = None) extends ReceiverInputDStream[(String, Array[Byte])](_ssc) {
+
+ private[streaming] override def name: String = s"MQTT stream [$id]"
+
+ def getReceiver(): Receiver[(String, Array[Byte])] = {
+ new MQTTByteArrayPairReceiver(brokerUrl, topics, storageLevel, clientId, username,
+ password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+ }
+}
+
+private[streaming] class MQTTByteArrayPairReceiver(
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]) extends Receiver[(String, Array[Byte])](storageLevel) {
+
+ def onStop() {
+
+ }
+
+ def onStart() {
+
+ // Set up persistence for messages
+ val persistence = new MemoryPersistence()
+
+ // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+ val client = new MqttClient(brokerUrl, clientId.getOrElse(MqttClient.generateClientId()),
+ persistence)
+
+ // Initialize mqtt parameters
+ val mqttConnectionOptions = new MqttConnectOptions()
+ if (username.isDefined && password.isDefined) {
+ mqttConnectionOptions.setUserName(username.get)
+ mqttConnectionOptions.setPassword(password.get.toCharArray)
+ }
+ mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true))
+ if (connectionTimeout.isDefined) {
+ mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get)
+ }
+ if (keepAliveInterval.isDefined) {
+ mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get)
+ }
+ if (mqttVersion.isDefined) {
+ mqttConnectionOptions.setMqttVersion(mqttVersion.get)
+ }
+
+ // Callback automatically triggers as and when new message arrives on specified topic
+ val callback = new MqttCallback() {
+
+ // Handles Mqtt message
+ override def messageArrived(topic: String, message: MqttMessage) {
+ store((topic, message.getPayload()))
+ }
+
+ override def deliveryComplete(token: IMqttDeliveryToken) {
+ }
+
+ override def connectionLost(cause: Throwable) {
+ restart("Connection lost ", cause)
+ }
+ }
+
+ // Set up callback for MqttClient. This needs to happen before
+ // connecting or subscribing, otherwise messages may be lost
+ client.setCallback(callback)
+
+ // Connect to MqttBroker
+ client.connect(mqttConnectionOptions)
+
+ // Subscribe to Mqtt topic
+ var i = 0
+ val qosArray = Array.ofDim[Int](topics.length)
+ for (i <- qosArray.indices) {
+ qosArray(i) = qos.getOrElse(1)
+ }
+ client.subscribe(topics, qosArray)
+
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
index 050777b..ec89ed7 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that subscribe messages from a Mqtt Broker.
* Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param _ssc Spark Streaming StreamingContext
* @param brokerUrl Url of remote mqtt publisher
* @param topics topic name Array to subscribe to
* @param storageLevel RDD storage level.
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 0accb80..f42275f 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -215,22 +215,39 @@ object MQTTUtils {
new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
}
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param ssc StreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topics Array of topic names to subscribe to
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param clientId ClientId to use for the mqtt connection
- * @param username Username for authentication to the mqtt publisher
- * @param password Password for authentication to the mqtt publisher
- * @param cleanSession Sets the mqtt cleanSession parameter
- * @param qos Quality of service to use for the topic subscription
- * @param connectionTimeout Connection timeout for the mqtt connection
- * @param keepAliveInterval Keepalive interal for the mqtt connection
- * @param mqttVersion Version to use for the mqtt connection
- */
+ def createPairedByteArrayStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[(String, Array[Byte])] = {
+ new MQTTPairedByteArrayInputDStream(ssc, brokerUrl, topics, storageLevel)
+ }
+
+/**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
def createPairedStream(
ssc: StreamingContext,
brokerUrl: String,
@@ -246,57 +263,130 @@ object MQTTUtils {
mqttVersion: Option[Int]
): ReceiverInputDStream[(String, String)] = {
new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password,
- cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+ cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
}
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topics Array of topic names to subscribe to
- */
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedByteArrayStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]
+ ): ReceiverInputDStream[(String, Array[Byte])] = {
+ new MQTTPairedByteArrayInputDStream(ssc, brokerUrl, topics, storageLevel,
+ clientId, username, password, cleanSession, qos, connectionTimeout,
+ keepAliveInterval, mqttVersion)
+ }
+
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ */
def createPairedStream(
- jssc: JavaStreamingContext,
- brokerUrl: String,
- topics: Array[String]
- ): JavaReceiverInputDStream[(String, String)] = {
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String]
+ ): JavaReceiverInputDStream[(String, String)] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createPairedStream(jssc.ssc, brokerUrl, topics)
}
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topics Array of topic names to subscribe to
- * @param storageLevel RDD storage level.
- */
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ */
+ def createPairedByteArrayStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String]
+ ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedByteArrayStream(jssc.ssc, brokerUrl, topics)
+ }
+
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level.
+ */
def createPairedStream(
- jssc: JavaStreamingContext,
- brokerUrl: String,
- topics: Array[String],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[(String, String)] = {
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[(String, String)] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel)
}
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topics Array of topic names to subscribe to
- * @param storageLevel RDD storage level.
- * @param clientId ClientId to use for the mqtt connection
- * @param username Username for authentication to the mqtt publisher
- * @param password Password for authentication to the mqtt publisher
- * @param cleanSession Sets the mqtt cleanSession parameter
- * @param qos Quality of service to use for the topic subscription
- * @param connectionTimeout Connection timeout for the mqtt connection
- * @param keepAliveInterval Keepalive interal for the mqtt connection
- * @param mqttVersion Version to use for the mqtt connection
- */
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+ def createPairedByteArrayStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, storageLevel)
+ }
+
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
def createPairedStream(
jssc: JavaStreamingContext,
brokerUrl: String,
@@ -317,20 +407,57 @@ object MQTTUtils {
Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
}
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topics Array of topic names to subscribe to
- * @param clientId ClientId to use for the mqtt connection
- * @param username Username for authentication to the mqtt publisher
- * @param password Password for authentication to the mqtt publisher
- * @param cleanSession Sets the mqtt cleanSession parameter
- * @param qos Quality of service to use for the topic subscription
- * @param connectionTimeout Connection timeout for the mqtt connection
- * @param keepAliveInterval Keepalive interal for the mqtt connection
- * @param mqttVersion Version to use for the mqtt connection
- */
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedByteArrayStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean,
+ qos: Int,
+ connectionTimeout: Int,
+ keepAliveInterval: Int,
+ mqttVersion: Int
+ ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, storageLevel, Option(clientId),
+ Option(username), Option(password), Option(cleanSession), Option(qos),
+ Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+ }
+
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
def createPairedStream(
jssc: JavaStreamingContext,
brokerUrl: String,
@@ -346,20 +473,56 @@ object MQTTUtils {
): JavaReceiverInputDStream[(String, String)] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
- Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos),
- Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+ Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos),
+ Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
}
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topics Array of topic names to subscribe to
- * @param clientId ClientId to use for the mqtt connection
- * @param username Username for authentication to the mqtt publisher
- * @param password Password for authentication to the mqtt publisher
- * @param cleanSession Sets the mqtt cleanSession parameter
- */
+/**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedByteArrayStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean,
+ qos: Int,
+ connectionTimeout: Int,
+ keepAliveInterval: Int,
+ mqttVersion: Int
+ ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
+ Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos),
+ Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+ }
+
+
+/**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ */
def createPairedStream(
jssc: JavaStreamingContext,
brokerUrl: String,
@@ -368,12 +531,40 @@ object MQTTUtils {
username: String,
password: String,
cleanSession: Boolean
- ): JavaReceiverInputDStream[(String, String)] = {
+ ): JavaReceiverInputDStream[(String, String)] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
- Option(clientId), Option(username), Option(password), Option(cleanSession), None,
- None, None, None)
+ Option(clientId), Option(username), Option(password), Option(cleanSession), None,
+ None, None, None)
}
+
+
+/**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ */
+ def createPairedByteArrayStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean
+ ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
+ Option(clientId), Option(username), Option(password), Option(cleanSession), None,
+ None, None, None)
+ }
+
}
/**
@@ -398,4 +589,13 @@ private[mqtt] class MQTTUtilsPythonHelper {
): JavaDStream[(String, String)] = {
MQTTUtils.createPairedStream(jssc, brokerUrl, topics, storageLevel)
}
+
+ def createPairedByteArrayStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[(String, Array[Byte])] = {
+ MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics, storageLevel)
+ }
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index d320595..e30d187 100644
--- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -49,8 +49,26 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user",
"password", true, 1, 10, 30, 3);
JavaReceiverInputDStream<Tuple2<String, String>> test9 = MQTTUtils.createPairedStream(ssc,
- brokerUrl, topics, "testid", "user", "password", true, 1, 10, 30, 3);
+ brokerUrl, topics, "testid", "user", "password", true, 1,
+ 10, 30, 3);
JavaReceiverInputDStream<Tuple2<String, String>> test10 = MQTTUtils.createPairedStream(ssc,
brokerUrl, topics, "testid", "user", "password", true);
+ JavaReceiverInputDStream<Tuple2<String, byte[]>> test11 =
+ MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics);
+ JavaReceiverInputDStream<Tuple2<String, byte[]>> test12 =
+ MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<Tuple2<String, byte[]>> test13 =
+ MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics,
+ StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user",
+ "password", true, 1, 10, 30, 3);
+ JavaReceiverInputDStream<Tuple2<String, byte[]>> test14 =
+ MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics,
+ "testid", "user", "password", true,
+ 1, 10, 30, 3);
+ JavaReceiverInputDStream<Tuple2<String, byte[]>> test15 =
+ MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, "testid",
+ "user", "password", true);
+
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index f1d9a20..6ef551b 100644
--- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -101,4 +101,30 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
}
ssc.stop()
}
+
+ test("mqtt input stream3") {
+ val sendMessage1 = "MQTT demo for spark streaming1"
+ val sendMessage2 = "MQTT demo for spark streaming2"
+ val receiveStream2 = MQTTUtils.createPairedByteArrayStream(ssc,
+ "tcp://" + mqttTestUtils.brokerUri, topics, StorageLevel.MEMORY_ONLY)
+
+ @volatile var receiveMessage: List[String] = List()
+ receiveStream2.foreachRDD { rdd =>
+ if (rdd.collect.length > 0) {
+ receiveMessage = receiveMessage ::: List(new String(rdd.first()._2))
+ receiveMessage
+ }
+ }
+
+ ssc.start()
+
+ // Retry it because we don't know when the receiver will start.
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ mqttTestUtils.publishData(topics(0), sendMessage1)
+ mqttTestUtils.publishData(topics(1), sendMessage2)
+ assert(receiveMessage.contains(sendMessage1)||receiveMessage.contains(sendMessage2))
+ }
+ ssc.stop()
+ }
+
}