You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/11/08 04:01:32 UTC

bahir git commit: [BAHIR-181] Add username and password in MQTTUtils for pyspark

Repository: bahir
Updated Branches:
  refs/heads/master be1effaaf -> e79a960fa


[BAHIR-181] Add username and password in MQTTUtils for pyspark

Closes #69


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e79a960f
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e79a960f
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e79a960f

Branch: refs/heads/master
Commit: e79a960fa289ee6caefce43c37355a73d44b5220
Parents: be1effa
Author: zhankeyu <ZisZ>
Authored: Thu Nov 1 17:26:19 2018 +0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Nov 7 19:45:25 2018 -0800

----------------------------------------------------------------------
 streaming-mqtt/python/mqtt.py                   | 20 ++++++++++++
 .../apache/spark/streaming/mqtt/MQTTUtils.scala | 33 ++++++++++++++++++++
 2 files changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/e79a960f/streaming-mqtt/python/mqtt.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py
index ad71baf..68c34b7 100644
--- a/streaming-mqtt/python/mqtt.py
+++ b/streaming-mqtt/python/mqtt.py
@@ -44,6 +44,26 @@ class MQTTUtils(object):
         return DStream(jstream, ssc, UTF8Deserializer())
 
     @staticmethod
+    def createStream(ssc, brokerUrl, topic,
+                     username, password,
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_2):
+        """
+        Create an input stream that pulls messages from a Mqtt Broker.
+
+        :param ssc:  StreamingContext object
+        :param brokerUrl:  Url of remote mqtt publisher
+        :param topic:  topic name to subscribe to
+        :param username:  the vitual host name : username or username
+        :param password:  the password of mqtt
+        :param storageLevel:  RDD storage level.
+        :return: A DStream object
+        """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+        helper = MQTTUtils._get_helper(ssc._sc)
+        jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel, username, password)
+        return DStream(jstream, ssc, UTF8Deserializer())
+
+    @staticmethod
     def createPairedStream(ssc, brokerUrl, topics,
                      storageLevel=StorageLevel.MEMORY_AND_DISK_2):
         """

http://git-wip-us.apache.org/repos/asf/bahir/blob/e79a960f/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index f42275f..ef0e99f 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -148,6 +148,28 @@ object MQTTUtils {
    * @param jssc               JavaStreamingContext object
    * @param brokerUrl          Url of remote MQTT publisher
    * @param topic              Topic name to subscribe to
+   * @param storageLevel       RDD storage level.
+   * @param username           Username for authentication to the mqtt publisher
+   * @param password           Password for authentication to the mqtt publisher
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topic: String,
+      storageLevel: StorageLevel,
+      username: String,
+      password: String
+    ): JavaReceiverInputDStream[String] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createStream(jssc.ssc, brokerUrl, topic, storageLevel, None, Option(username),
+        Option(password), None, None, None, None, None)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param jssc               JavaStreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topic              Topic name to subscribe to
    * @param clientId           ClientId to use for the mqtt connection
    * @param username           Username for authentication to the mqtt publisher
    * @param password           Password for authentication to the mqtt publisher
@@ -598,4 +620,15 @@ private[mqtt] class MQTTUtilsPythonHelper {
       ): JavaDStream[(String, Array[Byte])] = {
     MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics, storageLevel)
   }
+
+  def createStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topic: String,
+      storageLevel: StorageLevel,
+      username: String,
+      password: String
+    ): JavaDStream[String] = {
+     MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel, username, password)
+  }
 }