You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by ckadner <gi...@git.apache.org> on 2017/03/03 09:46:40 UTC
[GitHub] bahir pull request #37: [BAHIR-89] Multi topic support API for streaming MQT...
Github user ckadner commented on a diff in the pull request:
https://github.com/apache/bahir/pull/37#discussion_r104118613
--- Diff: streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala ---
@@ -199,7 +199,181 @@ object MQTTUtils {
createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
Option(username), Option(password), Option(cleanSession), None, None, None, None)
}
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createPairedStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[(String, String)] = {
+ new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]
+ ): ReceiverInputDStream[(String, String)] = {
+ new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password,
+ cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Array of topic names to subscribe to
--- End diff --
should be `@param topics` (plural)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---