You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by ckadner <gi...@git.apache.org> on 2017/03/03 09:46:40 UTC

[GitHub] bahir pull request #37: [BAHIR-89] Multi topic support API for streaming MQT...

Github user ckadner commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/37#discussion_r104118613
  
    --- Diff: streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala ---
    @@ -199,7 +199,181 @@ object MQTTUtils {
         createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
           Option(username), Option(password), Option(cleanSession), None, None, None, None)
       }
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT publisher.
    +   * @param ssc           StreamingContext object
    +   * @param brokerUrl     Url of remote MQTT publisher
    +   * @param topics        Array of topic names to subscribe to
    +   * @param storageLevel  RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
    +   */
    +  def createPairedStream(
    +      ssc: StreamingContext,
    +      brokerUrl: String,
    +      topics: Array[String],
    +      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    +    ): ReceiverInputDStream[(String, String)] = {
    +    new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
    +  }
    +
     
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT publisher.
    +   * @param ssc                StreamingContext object
    +   * @param brokerUrl          Url of remote MQTT publisher
    +   * @param topics             Array of topic names to subscribe to
    +   * @param storageLevel       RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
    +   * @param clientId           ClientId to use for the mqtt connection
    +   * @param username           Username for authentication to the mqtt publisher
    +   * @param password           Password for authentication to the mqtt publisher
    +   * @param cleanSession       Sets the mqtt cleanSession parameter
    +   * @param qos                Quality of service to use for the topic subscription
    +   * @param connectionTimeout  Connection timeout for the mqtt connection
    +   * @param keepAliveInterval  Keepalive interal for the mqtt connection
    +   * @param mqttVersion        Version to use for the mqtt connection
    +   */
    +  def createPairedStream(
    +      ssc: StreamingContext,
    +      brokerUrl: String,
    +      topics: Array[String],
    +      storageLevel: StorageLevel,
    +      clientId: Option[String],
    +      username: Option[String],
    +      password: Option[String],
    +      cleanSession: Option[Boolean],
    +      qos: Option[Int],
    +      connectionTimeout: Option[Int],
    +      keepAliveInterval: Option[Int],
    +      mqttVersion: Option[Int]
    +    ): ReceiverInputDStream[(String, String)] = {
    +    new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password,
    +          cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
    +  }
    +
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT publisher.
    +   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    +   * @param jssc      JavaStreamingContext object
    +   * @param brokerUrl Url of remote MQTT publisher
    +   * @param topic     Array of topic names to subscribe to
    --- End diff --
    
    should be `@param topics` (plural)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---