You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/11/08 04:01:32 UTC
bahir git commit: [BAHIR-181] Add username and password in MQTTUtils
for pyspark
Repository: bahir
Updated Branches:
refs/heads/master be1effaaf -> e79a960fa
[BAHIR-181] Add username and password in MQTTUtils for pyspark
Closes #69
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e79a960f
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e79a960f
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e79a960f
Branch: refs/heads/master
Commit: e79a960fa289ee6caefce43c37355a73d44b5220
Parents: be1effa
Author: zhankeyu <ZisZ>
Authored: Thu Nov 1 17:26:19 2018 +0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Nov 7 19:45:25 2018 -0800
----------------------------------------------------------------------
streaming-mqtt/python/mqtt.py | 20 ++++++++++++
.../apache/spark/streaming/mqtt/MQTTUtils.scala | 33 ++++++++++++++++++++
2 files changed, 53 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/e79a960f/streaming-mqtt/python/mqtt.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py
index ad71baf..68c34b7 100644
--- a/streaming-mqtt/python/mqtt.py
+++ b/streaming-mqtt/python/mqtt.py
@@ -44,6 +44,26 @@ class MQTTUtils(object):
return DStream(jstream, ssc, UTF8Deserializer())
@staticmethod
+ def createStream(ssc, brokerUrl, topic,
+ username, password,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2):
+ """
+ Create an input stream that pulls messages from a Mqtt Broker.
+
+ :param ssc: StreamingContext object
+ :param brokerUrl: Url of remote mqtt publisher
+ :param topic: topic name to subscribe to
+ :param username: the vitual host name : username or username
+ :param password: the password of mqtt
+ :param storageLevel: RDD storage level.
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ helper = MQTTUtils._get_helper(ssc._sc)
+ jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel, username, password)
+ return DStream(jstream, ssc, UTF8Deserializer())
+
+ @staticmethod
def createPairedStream(ssc, brokerUrl, topics,
storageLevel=StorageLevel.MEMORY_AND_DISK_2):
"""
http://git-wip-us.apache.org/repos/asf/bahir/blob/e79a960f/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index f42275f..ef0e99f 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -148,6 +148,28 @@ object MQTTUtils {
* @param jssc JavaStreamingContext object
* @param brokerUrl Url of remote MQTT publisher
* @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel,
+ username: String,
+ password: String
+ ): JavaReceiverInputDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, brokerUrl, topic, storageLevel, None, Option(username),
+ Option(password), None, None, None, None, None)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
* @param clientId ClientId to use for the mqtt connection
* @param username Username for authentication to the mqtt publisher
* @param password Password for authentication to the mqtt publisher
@@ -598,4 +620,15 @@ private[mqtt] class MQTTUtilsPythonHelper {
): JavaDStream[(String, Array[Byte])] = {
MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics, storageLevel)
}
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel,
+ username: String,
+ password: String
+ ): JavaDStream[String] = {
+ MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel, username, password)
+ }
}