You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lr...@apache.org on 2017/07/20 01:37:13 UTC
[27/50] [abbrv] incubator-livy-website git commit: [BAHIR-89] Multi
topic API support for streaming MQTT
[BAHIR-89] Multi topic API support for streaming MQTT
New API which accept array of MQTT topics as input
and return Tuple2<TopicName, Message> as output.
It helps consume from multiple MQTT topics with
efficient user of resources.
Closes #37.
Project: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/commit/826545cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/tree/826545cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/diff/826545cb
Branch: refs/heads/master
Commit: 826545cb8db4b89bbdb3927e53f555c0fa15771e
Parents: 8d46b39
Author: Anntinu <an...@gmail.com>
Authored: Mon Feb 27 07:37:07 2017 +0530
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Mar 23 14:32:04 2017 -0700
----------------------------------------------------------------------
.gitattributes | 13 --
.gitignore | 24 ---
streaming-mqtt/README.md | 6 +-
streaming-mqtt/python/mqtt.py | 17 ++
.../streaming/mqtt/MQTTPairedInputDStream.scala | 142 +++++++++++++++
.../apache/spark/streaming/mqtt/MQTTUtils.scala | 182 +++++++++++++++++++
.../streaming/mqtt/JavaMQTTStreamSuite.java | 15 +-
.../spark/streaming/mqtt/MQTTStreamSuite.scala | 25 +++
8 files changed, 385 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/826545cb/.gitattributes
----------------------------------------------------------------------
diff --git a/.gitattributes b/.gitattributes
deleted file mode 100644
index a8edefd..0000000
--- a/.gitattributes
+++ /dev/null
@@ -1,13 +0,0 @@
-# Set the default behavior to have all files normalized to Unix-style
-# line endings upon check-in.
-* text=auto
-
-# Declare files that will always have CRLF line endings on checkout.
-*.bat text eol=crlf
-
-# Denote all files that are truly binary and should not be modified.
-*.dll binary
-*.exp binary
-*.lib binary
-*.pdb binary
-*.exe binary
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/826545cb/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
deleted file mode 100644
index fb6d3b7..0000000
--- a/.gitignore
+++ /dev/null
@@ -1,24 +0,0 @@
-# Mac
-.DS_Store
-
-# Eclipse
-.classpath
-.project
-.settings/
-target/
-
-# Intellij
-.idea/
-.idea_modules/
-*.iml
-*.iws
-*.class
-*.log
-
-# Python
-*.pyc
-
-# Others
-.checkstyle
-.fbExcludeFilterFile
-dependency-reduced-pom.xml
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/826545cb/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 872375d..6b89136 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -32,6 +32,7 @@ This source uses the [Eclipse Paho Java Client](https://eclipse.org/paho/clients
* `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883.
* `storageLevel` By default it is used for storing incoming messages on disk.
* `topic` Topic MqttClient subscribes to.
+ * `topics` List of topics MqttClient subscribes to.
* `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
* `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
* `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
@@ -50,11 +51,13 @@ You need to extend `ActorReceiver` so as to store received data into Spark using
this actor can be configured to handle failures, etc.
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
Additional mqtt connection options can be provided:
```Scala
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
```
### Java API
@@ -63,5 +66,6 @@ You need to extend `JavaActorReceiver` so as to store received data into Spark u
this actor can be configured to handle failures, etc.
JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
+ JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics);
-See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)
\ No newline at end of file
+See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/826545cb/streaming-mqtt/python/mqtt.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py
index c55b704..da00394 100644
--- a/streaming-mqtt/python/mqtt.py
+++ b/streaming-mqtt/python/mqtt.py
@@ -44,6 +44,23 @@ class MQTTUtils(object):
return DStream(jstream, ssc, UTF8Deserializer())
@staticmethod
+ def createPairedStream(ssc, brokerUrl, topics,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2):
+ """
+ Create an input stream that pulls messages from a Mqtt Broker.
+
+ :param ssc: StreamingContext object
+ :param brokerUrl: Url of remote mqtt publisher
+ :param topics: topic names to subscribe to
+ :param storageLevel: RDD storage level.
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ helper = MQTTUtils._get_helper(ssc._sc)
+ jstream = helper.createStream(ssc._jssc, brokerUrl, topics, jlevel)
+ return DStream(jstream, ssc, UTF8Deserializer())
+
+ @staticmethod
def _get_helper(sc):
try:
return sc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/826545cb/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
new file mode 100644
index 0000000..050777b
--- /dev/null
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import java.nio.charset.StandardCharsets
+
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topics topic name Array to subscribe to
+ * @param storageLevel RDD storage level.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+private[streaming] class MQTTPairedInputDStream(
+ _ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String] = None,
+ username: Option[String] = None,
+ password: Option[String] = None,
+ cleanSession: Option[Boolean] = None,
+ qos: Option[Int] = None,
+ connectionTimeout: Option[Int] = None,
+ keepAliveInterval: Option[Int] = None,
+ mqttVersion: Option[Int] = None) extends ReceiverInputDStream[(String, String)](_ssc) {
+
+ private[streaming] override def name: String = s"MQTT stream [$id]"
+
+ def getReceiver(): Receiver[(String, String)] = {
+ new MQTTPairReceiver(brokerUrl, topics, storageLevel, clientId, username,
+ password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+ }
+}
+
+private[streaming] class MQTTPairReceiver(
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]) extends Receiver[(String, String)](storageLevel) {
+
+ def onStop() {
+
+ }
+
+ def onStart() {
+
+ // Set up persistence for messages
+ val persistence = new MemoryPersistence()
+
+ // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+ val client = new MqttClient(brokerUrl, clientId.getOrElse(MqttClient.generateClientId()),
+ persistence)
+
+ // Initialize mqtt parameters
+ val mqttConnectionOptions = new MqttConnectOptions()
+ if (username.isDefined && password.isDefined) {
+ mqttConnectionOptions.setUserName(username.get)
+ mqttConnectionOptions.setPassword(password.get.toCharArray)
+ }
+ mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true))
+ if (connectionTimeout.isDefined) {
+ mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get)
+ }
+ if (keepAliveInterval.isDefined) {
+ mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get)
+ }
+ if (mqttVersion.isDefined) {
+ mqttConnectionOptions.setMqttVersion(mqttVersion.get)
+ }
+
+ // Callback automatically triggers as and when new message arrives on specified topic
+ val callback = new MqttCallback() {
+
+ // Handles Mqtt message
+ override def messageArrived(topic: String, message: MqttMessage) {
+ store((topic, new String(message.getPayload(), StandardCharsets.UTF_8)))
+ }
+
+ override def deliveryComplete(token: IMqttDeliveryToken) {
+ }
+
+ override def connectionLost(cause: Throwable) {
+ restart("Connection lost ", cause)
+ }
+ }
+
+ // Set up callback for MqttClient. This needs to happen before
+ // connecting or subscribing, otherwise messages may be lost
+ client.setCallback(callback)
+
+ // Connect to MqttBroker
+ client.connect(mqttConnectionOptions)
+
+ // Subscribe to Mqtt topic
+ var i = 0;
+ val qosArray = Array.ofDim[Int](topics.length);
+ for (i <- 0 to qosArray.length -1) {
+ qosArray(i) = qos.getOrElse(1);
+ }
+ client.subscribe(topics, qosArray)
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/826545cb/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 7e2f5c7..0accb80 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -199,7 +199,181 @@ object MQTTUtils {
createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
Option(username), Option(password), Option(cleanSession), None, None, None, None)
}
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createPairedStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[(String, String)] = {
+ new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]
+ ): ReceiverInputDStream[(String, String)] = {
+ new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password,
+ cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ */
+ def createPairedStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String]
+ ): JavaReceiverInputDStream[(String, String)] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedStream(jssc.ssc, brokerUrl, topics)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+ def createPairedStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[(String, String)] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean,
+ qos: Int,
+ connectionTimeout: Int,
+ keepAliveInterval: Int,
+ mqttVersion: Int
+ ): JavaReceiverInputDStream[(String, String)] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel, Option(clientId),
+ Option(username), Option(password), Option(cleanSession), Option(qos),
+ Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean,
+ qos: Int,
+ connectionTimeout: Int,
+ keepAliveInterval: Int,
+ mqttVersion: Int
+ ): JavaReceiverInputDStream[(String, String)] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
+ Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos),
+ Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt publisher
+ * @param password Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ */
+ def createPairedStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ clientId: String,
+ username: String,
+ password: String,
+ cleanSession: Boolean
+ ): JavaReceiverInputDStream[(String, String)] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
+ Option(clientId), Option(username), Option(password), Option(cleanSession), None,
+ None, None, None)
+ }
}
/**
@@ -216,4 +390,12 @@ private[mqtt] class MQTTUtilsPythonHelper {
): JavaDStream[String] = {
MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
}
+ def createPairedStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[(String, String)] = {
+ MQTTUtils.createPairedStream(jssc, brokerUrl, topics, storageLevel)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/826545cb/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 45332d9..d320595 100644
--- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -18,16 +18,18 @@
package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
+import scala.Tuple2;
public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
@Test
public void testMQTTStream() {
String brokerUrl = "abc";
String topic = "def";
+ String[] topics = {"def1","def2"};
// tests the API, does not actually test data receiving
JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
@@ -39,5 +41,16 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
"testid", "user", "password", true, 1, 10, 30, 3);
JavaReceiverInputDStream<String> test5 = MQTTUtils.createStream(ssc, brokerUrl, topic,
"testid", "user", "password", true);
+ JavaReceiverInputDStream<Tuple2<String, String>> test6 = MQTTUtils.createPairedStream(ssc,
+ brokerUrl, topics);
+ JavaReceiverInputDStream<Tuple2<String, String>> test7 = MQTTUtils.createPairedStream(ssc,
+ brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<Tuple2<String, String>> test8 = MQTTUtils.createPairedStream(ssc,
+ brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user",
+ "password", true, 1, 10, 30, 3);
+ JavaReceiverInputDStream<Tuple2<String, String>> test9 = MQTTUtils.createPairedStream(ssc,
+ brokerUrl, topics, "testid", "user", "password", true, 1, 10, 30, 3);
+ JavaReceiverInputDStream<Tuple2<String, String>> test10 = MQTTUtils.createPairedStream(ssc,
+ brokerUrl, topics, "testid", "user", "password", true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/826545cb/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index fdcd18c..f1d9a20 100644
--- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -33,6 +33,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
private val master = "local[2]"
private val framework = this.getClass.getSimpleName
private val topic = "def"
+ private val topics = Array("def1", "def2")
private var ssc: StreamingContext = _
private var mqttTestUtils: MQTTTestUtils = _
@@ -76,4 +77,28 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
}
ssc.stop()
}
+ test("mqtt input stream2") {
+ val sendMessage1 = "MQTT demo for spark streaming1"
+ val sendMessage2 = "MQTT demo for spark streaming2"
+ val receiveStream2 = MQTTUtils.createPairedStream(ssc, "tcp://" + mqttTestUtils.brokerUri,
+ topics, StorageLevel.MEMORY_ONLY)
+
+ @volatile var receiveMessage: List[String] = List()
+ receiveStream2.foreachRDD { rdd =>
+ if (rdd.collect.length > 0) {
+ receiveMessage = receiveMessage ::: List(rdd.first()._2)
+ receiveMessage
+ }
+ }
+
+ ssc.start()
+
+ // Retry it because we don't know when the receiver will start.
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ mqttTestUtils.publishData(topics(0), sendMessage1)
+ mqttTestUtils.publishData(topics(1), sendMessage2)
+ assert(receiveMessage.contains(sendMessage1)||receiveMessage.contains(sendMessage2))
+ }
+ ssc.stop()
+ }
}