You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/03/23 21:49:02 UTC

bahir git commit: [BAHIR-89] Multi topic API support for streaming MQTT

Repository: bahir
Updated Branches:
  refs/heads/master 8d46b3961 -> 826545cb8


[BAHIR-89] Multi topic API support for streaming MQTT

New API which accept array of MQTT topics as input
and return Tuple2<TopicName, Message> as output.

It helps consume from multiple MQTT topics with
efficient user of resources.

Closes #37.


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/826545cb
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/826545cb
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/826545cb

Branch: refs/heads/master
Commit: 826545cb8db4b89bbdb3927e53f555c0fa15771e
Parents: 8d46b39
Author: Anntinu <an...@gmail.com>
Authored: Mon Feb 27 07:37:07 2017 +0530
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Mar 23 14:32:04 2017 -0700

----------------------------------------------------------------------
 .gitattributes                                  |  13 --
 .gitignore                                      |  24 ---
 streaming-mqtt/README.md                        |   6 +-
 streaming-mqtt/python/mqtt.py                   |  17 ++
 .../streaming/mqtt/MQTTPairedInputDStream.scala | 142 +++++++++++++++
 .../apache/spark/streaming/mqtt/MQTTUtils.scala | 182 +++++++++++++++++++
 .../streaming/mqtt/JavaMQTTStreamSuite.java     |  15 +-
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  |  25 +++
 8 files changed, 385 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/.gitattributes
----------------------------------------------------------------------
diff --git a/.gitattributes b/.gitattributes
deleted file mode 100644
index a8edefd..0000000
--- a/.gitattributes
+++ /dev/null
@@ -1,13 +0,0 @@
-# Set the default behavior to have all files normalized to Unix-style
-# line endings upon check-in.
-* text=auto
-
-# Declare files that will always have CRLF line endings on checkout.
-*.bat text eol=crlf
-
-# Denote all files that are truly binary and should not be modified.
-*.dll binary
-*.exp binary
-*.lib binary
-*.pdb binary
-*.exe binary

http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
deleted file mode 100644
index fb6d3b7..0000000
--- a/.gitignore
+++ /dev/null
@@ -1,24 +0,0 @@
-# Mac
-.DS_Store
-
-# Eclipse
-.classpath
-.project
-.settings/
-target/
-
-# Intellij
-.idea/
-.idea_modules/
-*.iml
-*.iws
-*.class
-*.log
-
-# Python
-*.pyc
-
-# Others
-.checkstyle
-.fbExcludeFilterFile
-dependency-reduced-pom.xml

http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 872375d..6b89136 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -32,6 +32,7 @@ This source uses the [Eclipse Paho Java Client](https://eclipse.org/paho/clients
  * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883.
  * `storageLevel` By default it is used for storing incoming messages on disk.
  * `topic` Topic MqttClient subscribes to.
+ * `topics` List of topics MqttClient subscribes to.
  * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
  * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
  * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
@@ -50,11 +51,13 @@ You need to extend `ActorReceiver` so as to store received data into Spark using
 this actor can be configured to handle failures, etc.
 
     val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
+    val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
 
 Additional mqtt connection options can be provided:
 
 ```Scala
 val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
 ```
 
 ### Java API
@@ -63,5 +66,6 @@ You need to extend `JavaActorReceiver` so as to store received data into Spark u
 this actor can be configured to handle failures, etc.
 
     JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
+    JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics);
 
-See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)
\ No newline at end of file
+See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)

http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/python/mqtt.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py
index c55b704..da00394 100644
--- a/streaming-mqtt/python/mqtt.py
+++ b/streaming-mqtt/python/mqtt.py
@@ -44,6 +44,23 @@ class MQTTUtils(object):
         return DStream(jstream, ssc, UTF8Deserializer())
 
     @staticmethod
+    def createPairedStream(ssc, brokerUrl, topics,
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_2):
+        """
+        Create an input stream that pulls messages from a Mqtt Broker.
+
+        :param ssc:  StreamingContext object
+        :param brokerUrl:  Url of remote mqtt publisher
+        :param topics:  topic names to subscribe to
+        :param storageLevel:  RDD storage level.
+        :return: A DStream object
+        """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+        helper = MQTTUtils._get_helper(ssc._sc)
+        jstream = helper.createStream(ssc._jssc, brokerUrl, topics, jlevel)
+        return DStream(jstream, ssc, UTF8Deserializer())
+
+    @staticmethod
     def _get_helper(sc):
         try:
             return sc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()

http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
new file mode 100644
index 0000000..050777b
--- /dev/null
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import java.nio.charset.StandardCharsets
+
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param brokerUrl          Url of remote mqtt publisher
+ * @param topics             topic name Array to subscribe to
+ * @param storageLevel       RDD storage level.
+ * @param clientId           ClientId to use for the mqtt connection
+ * @param username           Username for authentication to the mqtt publisher
+ * @param password           Password for authentication to the mqtt publisher
+ * @param cleanSession       Sets the mqtt cleanSession parameter
+ * @param qos                Quality of service to use for the topic subscription
+ * @param connectionTimeout  Connection timeout for the mqtt connection
+ * @param keepAliveInterval  Keepalive interal for the mqtt connection
+ * @param mqttVersion        Version to use for the mqtt connection
+ */
+private[streaming] class MQTTPairedInputDStream(
+    _ssc: StreamingContext,
+    brokerUrl: String,
+    topics: Array[String],
+    storageLevel: StorageLevel,
+    clientId: Option[String] = None,
+    username: Option[String] = None,
+    password: Option[String] = None,
+    cleanSession: Option[Boolean] = None,
+    qos: Option[Int] = None,
+    connectionTimeout: Option[Int] = None,
+    keepAliveInterval: Option[Int] = None,
+    mqttVersion: Option[Int] = None) extends ReceiverInputDStream[(String, String)](_ssc) {
+
+  private[streaming] override def name: String = s"MQTT stream [$id]"
+
+  def getReceiver(): Receiver[(String, String)] = {
+    new MQTTPairReceiver(brokerUrl, topics, storageLevel, clientId, username,
+        password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+  }
+}
+
+private[streaming] class MQTTPairReceiver(
+    brokerUrl: String,
+    topics: Array[String],
+    storageLevel: StorageLevel,
+    clientId: Option[String],
+    username: Option[String],
+    password: Option[String],
+    cleanSession: Option[Boolean],
+    qos: Option[Int],
+    connectionTimeout: Option[Int],
+    keepAliveInterval: Option[Int],
+    mqttVersion: Option[Int]) extends Receiver[(String, String)](storageLevel) {
+
+  def onStop() {
+
+  }
+
+  def onStart() {
+
+    // Set up persistence for messages
+    val persistence = new MemoryPersistence()
+
+    // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+    val client = new MqttClient(brokerUrl, clientId.getOrElse(MqttClient.generateClientId()),
+      persistence)
+
+    // Initialize mqtt parameters
+    val mqttConnectionOptions = new MqttConnectOptions()
+    if (username.isDefined && password.isDefined) {
+      mqttConnectionOptions.setUserName(username.get)
+      mqttConnectionOptions.setPassword(password.get.toCharArray)
+    }
+    mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true))
+    if (connectionTimeout.isDefined) {
+      mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get)
+    }
+    if (keepAliveInterval.isDefined) {
+      mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get)
+    }
+    if (mqttVersion.isDefined) {
+      mqttConnectionOptions.setMqttVersion(mqttVersion.get)
+    }
+
+    // Callback automatically triggers as and when new message arrives on specified topic
+    val callback = new MqttCallback() {
+
+      // Handles Mqtt message
+      override def messageArrived(topic: String, message: MqttMessage) {
+        store((topic, new String(message.getPayload(), StandardCharsets.UTF_8)))
+      }
+
+      override def deliveryComplete(token: IMqttDeliveryToken) {
+      }
+
+      override def connectionLost(cause: Throwable) {
+        restart("Connection lost ", cause)
+      }
+    }
+
+    // Set up callback for MqttClient. This needs to happen before
+    // connecting or subscribing, otherwise messages may be lost
+    client.setCallback(callback)
+
+    // Connect to MqttBroker
+    client.connect(mqttConnectionOptions)
+
+    // Subscribe to Mqtt topic
+    var i = 0;
+    val qosArray = Array.ofDim[Int](topics.length);
+    for (i <- 0 to qosArray.length -1) {
+      qosArray(i) = qos.getOrElse(1);
+    }
+    client.subscribe(topics, qosArray)
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 7e2f5c7..0accb80 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -199,7 +199,181 @@ object MQTTUtils {
     createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
       Option(username), Option(password), Option(cleanSession), None, None, None, None)
   }
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param ssc           StreamingContext object
+   * @param brokerUrl     Url of remote MQTT publisher
+   * @param topics        Array of topic names to subscribe to
+   * @param storageLevel  RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+   */
+  def createPairedStream(
+      ssc: StreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): ReceiverInputDStream[(String, String)] = {
+    new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
+  }
+
 
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param ssc                StreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topics             Array of topic names to subscribe to
+   * @param storageLevel       RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+   * @param clientId           ClientId to use for the mqtt connection
+   * @param username           Username for authentication to the mqtt publisher
+   * @param password           Password for authentication to the mqtt publisher
+   * @param cleanSession       Sets the mqtt cleanSession parameter
+   * @param qos                Quality of service to use for the topic subscription
+   * @param connectionTimeout  Connection timeout for the mqtt connection
+   * @param keepAliveInterval  Keepalive interal for the mqtt connection
+   * @param mqttVersion        Version to use for the mqtt connection
+   */
+  def createPairedStream(
+      ssc: StreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      storageLevel: StorageLevel,
+      clientId: Option[String],
+      username: Option[String],
+      password: Option[String],
+      cleanSession: Option[Boolean],
+      qos: Option[Int],
+      connectionTimeout: Option[Int],
+      keepAliveInterval: Option[Int],
+      mqttVersion: Option[Int]
+    ): ReceiverInputDStream[(String, String)] = {
+    new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password,
+          cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+   * @param jssc      JavaStreamingContext object
+   * @param brokerUrl Url of remote MQTT publisher
+   * @param topics    Array of topic names to subscribe to
+   */
+  def createPairedStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topics: Array[String]
+    ): JavaReceiverInputDStream[(String, String)] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedStream(jssc.ssc, brokerUrl, topics)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param jssc          JavaStreamingContext object
+   * @param brokerUrl     Url of remote MQTT publisher
+   * @param topics        Array of topic names to subscribe to
+   * @param storageLevel  RDD storage level.
+   */
+  def createPairedStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[(String, String)] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param jssc               JavaStreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topics             Array of topic names to subscribe to
+   * @param storageLevel       RDD storage level.
+   * @param clientId           ClientId to use for the mqtt connection
+   * @param username           Username for authentication to the mqtt publisher
+   * @param password           Password for authentication to the mqtt publisher
+   * @param cleanSession       Sets the mqtt cleanSession parameter
+   * @param qos                Quality of service to use for the topic subscription
+   * @param connectionTimeout  Connection timeout for the mqtt connection
+   * @param keepAliveInterval  Keepalive interal for the mqtt connection
+   * @param mqttVersion        Version to use for the mqtt connection
+   */
+  def createPairedStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      storageLevel: StorageLevel,
+      clientId: String,
+      username: String,
+      password: String,
+      cleanSession: Boolean,
+      qos: Int,
+      connectionTimeout: Int,
+      keepAliveInterval: Int,
+      mqttVersion: Int
+    ): JavaReceiverInputDStream[(String, String)] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel, Option(clientId),
+        Option(username), Option(password), Option(cleanSession), Option(qos),
+        Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param jssc               JavaStreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topics             Array of topic names to subscribe to
+   * @param clientId           ClientId to use for the mqtt connection
+   * @param username           Username for authentication to the mqtt publisher
+   * @param password           Password for authentication to the mqtt publisher
+   * @param cleanSession       Sets the mqtt cleanSession parameter
+   * @param qos                Quality of service to use for the topic subscription
+   * @param connectionTimeout  Connection timeout for the mqtt connection
+   * @param keepAliveInterval  Keepalive interal for the mqtt connection
+   * @param mqttVersion        Version to use for the mqtt connection
+   */
+  def createPairedStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      clientId: String,
+      username: String,
+      password: String,
+      cleanSession: Boolean,
+      qos: Int,
+      connectionTimeout: Int,
+      keepAliveInterval: Int,
+      mqttVersion: Int
+    ): JavaReceiverInputDStream[(String, String)] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
+        Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos),
+        Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param jssc               JavaStreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topics             Array of topic names to subscribe to
+   * @param clientId           ClientId to use for the mqtt connection
+   * @param username           Username for authentication to the mqtt publisher
+   * @param password           Password for authentication to the mqtt publisher
+   * @param cleanSession       Sets the mqtt cleanSession parameter
+   */
+  def createPairedStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      clientId: String,
+      username: String,
+      password: String,
+      cleanSession: Boolean
+    ): JavaReceiverInputDStream[(String, String)] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
+        Option(clientId), Option(username), Option(password), Option(cleanSession), None,
+        None, None, None)
+  }
 }
 
 /**
@@ -216,4 +390,12 @@ private[mqtt] class MQTTUtilsPythonHelper {
     ): JavaDStream[String] = {
     MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
   }
+  def createPairedStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      storageLevel: StorageLevel
+    ): JavaDStream[(String, String)] = {
+    MQTTUtils.createPairedStream(jssc, brokerUrl, topics, storageLevel)
+  }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 45332d9..d320595 100644
--- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -18,16 +18,18 @@
 package org.apache.spark.streaming.mqtt;
 
 import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.junit.Test;
 
-import org.apache.spark.streaming.LocalJavaStreamingContext;
+import scala.Tuple2;
 
 public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
   @Test
   public void testMQTTStream() {
     String brokerUrl = "abc";
     String topic = "def";
+    String[] topics = {"def1","def2"};
 
     // tests the API, does not actually test data receiving
     JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
@@ -39,5 +41,16 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
       "testid", "user", "password", true, 1, 10, 30, 3);
     JavaReceiverInputDStream<String> test5 = MQTTUtils.createStream(ssc, brokerUrl, topic,
       "testid", "user", "password", true);
+    JavaReceiverInputDStream<Tuple2<String, String>> test6 = MQTTUtils.createPairedStream(ssc,
+      brokerUrl, topics);
+    JavaReceiverInputDStream<Tuple2<String, String>> test7 = MQTTUtils.createPairedStream(ssc,
+      brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<Tuple2<String, String>> test8 = MQTTUtils.createPairedStream(ssc,
+      brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user",
+      "password", true, 1, 10, 30, 3);
+    JavaReceiverInputDStream<Tuple2<String, String>> test9 = MQTTUtils.createPairedStream(ssc,
+      brokerUrl, topics, "testid", "user", "password", true, 1, 10, 30, 3);
+    JavaReceiverInputDStream<Tuple2<String, String>> test10 = MQTTUtils.createPairedStream(ssc,
+      brokerUrl, topics, "testid", "user", "password", true);
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index fdcd18c..f1d9a20 100644
--- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -33,6 +33,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
   private val master = "local[2]"
   private val framework = this.getClass.getSimpleName
   private val topic = "def"
+  private val topics = Array("def1", "def2")
 
   private var ssc: StreamingContext = _
   private var mqttTestUtils: MQTTTestUtils = _
@@ -76,4 +77,28 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     }
     ssc.stop()
   }
+  test("mqtt input stream2") {
+    val sendMessage1 = "MQTT demo for spark streaming1"
+    val sendMessage2 = "MQTT demo for spark streaming2"
+    val receiveStream2 = MQTTUtils.createPairedStream(ssc, "tcp://" + mqttTestUtils.brokerUri,
+        topics, StorageLevel.MEMORY_ONLY)
+
+    @volatile var receiveMessage: List[String] = List()
+    receiveStream2.foreachRDD { rdd =>
+      if (rdd.collect.length > 0) {
+        receiveMessage = receiveMessage ::: List(rdd.first()._2)
+        receiveMessage
+      }
+    }
+
+    ssc.start()
+
+    // Retry it because we don't know when the receiver will start.
+    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+      mqttTestUtils.publishData(topics(0), sendMessage1)
+      mqttTestUtils.publishData(topics(1), sendMessage2)
+      assert(receiveMessage.contains(sendMessage1)||receiveMessage.contains(sendMessage2))
+    }
+    ssc.stop()
+  }
 }