You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/07/19 23:50:03 UTC

bahir git commit: [BAHIR-100] Enhance MQTT connector to support byte arrays

Repository: bahir
Updated Branches:
  refs/heads/master dca8d4c2d -> e3d9e6960


[BAHIR-100] Enhance MQTT connector to support byte arrays

Closes #47


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e3d9e696
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e3d9e696
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e3d9e696

Branch: refs/heads/master
Commit: e3d9e6960941696ba073735e9d039c85146c217a
Parents: dca8d4c
Author: drosenst <da...@intel.com>
Authored: Wed Jul 5 23:41:02 2017 +0300
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jul 19 16:47:26 2017 -0700

----------------------------------------------------------------------
 streaming-mqtt/README.md                        |   3 +
 .../spark/streaming/mqtt/MQTTInputDStream.scala |   1 +
 .../mqtt/MQTTPairedByteArrayInputDStream.scala  | 144 ++++++++
 .../streaming/mqtt/MQTTPairedInputDStream.scala |   1 +
 .../apache/spark/streaming/mqtt/MQTTUtils.scala | 366 ++++++++++++++-----
 .../streaming/mqtt/JavaMQTTStreamSuite.java     |  20 +-
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  |  26 ++
 7 files changed, 477 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 6b89136..eb08b51 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -52,12 +52,14 @@ this actor can be configured to handle failures, etc.
 
     val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
     val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
+    val lines = MQTTUtils.createPairedByteArrayStreamStream(ssc, brokerUrl, topic)
 
 Additional mqtt connection options can be provided:
 
 ```Scala
 val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
 val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
 ```
 
 ### Java API
@@ -67,5 +69,6 @@ this actor can be configured to handle failures, etc.
 
     JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
     JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics);
+    JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics);
 
 See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 328656b..cf27440 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.receiver.Receiver
 /**
  * Input stream that subscribe messages from a Mqtt Broker.
  * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param _ssc               Spark Streaming StreamingContext
  * @param brokerUrl          Url of remote mqtt publisher
  * @param topic              topic name to subscribe to
  * @param storageLevel       RDD storage level.

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala
new file mode 100644
index 0000000..07c0b18
--- /dev/null
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param _ssc:              Spark Streaming StreamingContext,
+ * @param brokerUrl          Url of remote mqtt publisher
+ * @param topics             topic name Array to subscribe to
+ * @param storageLevel       RDD storage level.
+ * @param clientId           ClientId to use for the mqtt connection
+ * @param username           Username for authentication to the mqtt publisher
+ * @param password           Password for authentication to the mqtt publisher
+ * @param cleanSession       Sets the mqtt cleanSession parameter
+ * @param qos                Quality of service to use for the topic subscription
+ * @param connectionTimeout  Connection timeout for the mqtt connection
+ * @param keepAliveInterval  Keepalive interal for the mqtt connection
+ * @param mqttVersion        Version to use for the mqtt connection
+ */
+private[streaming] class MQTTPairedByteArrayInputDStream(
+    _ssc: StreamingContext,
+    brokerUrl: String,
+    topics: Array[String],
+    storageLevel: StorageLevel,
+    clientId: Option[String] = None,
+    username: Option[String] = None,
+    password: Option[String] = None,
+    cleanSession: Option[Boolean] = None,
+    qos: Option[Int] = None,
+    connectionTimeout: Option[Int] = None,
+    keepAliveInterval: Option[Int] = None,
+    mqttVersion: Option[Int] = None) extends ReceiverInputDStream[(String, Array[Byte])](_ssc) {
+
+  private[streaming] override def name: String = s"MQTT stream [$id]"
+
+  def getReceiver(): Receiver[(String, Array[Byte])] = {
+    new MQTTByteArrayPairReceiver(brokerUrl, topics, storageLevel, clientId, username,
+        password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+  }
+}
+
+private[streaming] class MQTTByteArrayPairReceiver(
+  brokerUrl: String,
+  topics: Array[String],
+  storageLevel: StorageLevel,
+  clientId: Option[String],
+  username: Option[String],
+  password: Option[String],
+  cleanSession: Option[Boolean],
+  qos: Option[Int],
+  connectionTimeout: Option[Int],
+  keepAliveInterval: Option[Int],
+  mqttVersion: Option[Int]) extends Receiver[(String, Array[Byte])](storageLevel) {
+
+  def onStop() {
+
+  }
+
+  def onStart() {
+
+    // Set up persistence for messages
+    val persistence = new MemoryPersistence()
+
+    // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+    val client = new MqttClient(brokerUrl, clientId.getOrElse(MqttClient.generateClientId()),
+      persistence)
+
+    // Initialize mqtt parameters
+    val mqttConnectionOptions = new MqttConnectOptions()
+    if (username.isDefined && password.isDefined) {
+      mqttConnectionOptions.setUserName(username.get)
+      mqttConnectionOptions.setPassword(password.get.toCharArray)
+    }
+    mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true))
+    if (connectionTimeout.isDefined) {
+      mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get)
+    }
+    if (keepAliveInterval.isDefined) {
+      mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get)
+    }
+    if (mqttVersion.isDefined) {
+      mqttConnectionOptions.setMqttVersion(mqttVersion.get)
+    }
+
+    // Callback automatically triggers as and when new message arrives on specified topic
+    val callback = new MqttCallback() {
+
+      // Handles Mqtt message
+      override def messageArrived(topic: String, message: MqttMessage) {
+        store((topic, message.getPayload()))
+      }
+
+      override def deliveryComplete(token: IMqttDeliveryToken) {
+      }
+
+      override def connectionLost(cause: Throwable) {
+        restart("Connection lost ", cause)
+      }
+    }
+
+    // Set up callback for MqttClient. This needs to happen before
+    // connecting or subscribing, otherwise messages may be lost
+    client.setCallback(callback)
+
+    // Connect to MqttBroker
+    client.connect(mqttConnectionOptions)
+
+    // Subscribe to Mqtt topic
+    var i = 0
+    val qosArray = Array.ofDim[Int](topics.length)
+    for (i <- qosArray.indices) {
+      qosArray(i) = qos.getOrElse(1)
+    }
+    client.subscribe(topics, qosArray)
+
+  }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
index 050777b..ec89ed7 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.receiver.Receiver
 /**
  * Input stream that subscribe messages from a Mqtt Broker.
  * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param _ssc               Spark Streaming StreamingContext
  * @param brokerUrl          Url of remote mqtt publisher
  * @param topics             topic name Array to subscribe to
  * @param storageLevel       RDD storage level.

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 0accb80..f42275f 100644
--- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -215,22 +215,39 @@ object MQTTUtils {
     new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
   }
 
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+  * @param ssc           StreamingContext object
+  * @param brokerUrl     Url of remote MQTT publisher
+  * @param topics        Array of topic names to subscribe to
+  * @param storageLevel  RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+  */
 
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param ssc                StreamingContext object
-   * @param brokerUrl          Url of remote MQTT publisher
-   * @param topics             Array of topic names to subscribe to
-   * @param storageLevel       RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param clientId           ClientId to use for the mqtt connection
-   * @param username           Username for authentication to the mqtt publisher
-   * @param password           Password for authentication to the mqtt publisher
-   * @param cleanSession       Sets the mqtt cleanSession parameter
-   * @param qos                Quality of service to use for the topic subscription
-   * @param connectionTimeout  Connection timeout for the mqtt connection
-   * @param keepAliveInterval  Keepalive interal for the mqtt connection
-   * @param mqttVersion        Version to use for the mqtt connection
-   */
+   def createPairedByteArrayStream(
+        ssc: StreamingContext,
+        brokerUrl: String,
+        topics: Array[String],
+        storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+        ): ReceiverInputDStream[(String, Array[Byte])] = {
+    new MQTTPairedByteArrayInputDStream(ssc, brokerUrl, topics, storageLevel)
+   }
+
+/**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param ssc               StreamingContext object
+ * @param brokerUrl         Url of remote MQTT publisher
+ * @param topics            Array of topic names to subscribe to
+ * @param storageLevel      RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param clientId          ClientId to use for the mqtt connection
+ * @param username          Username for authentication to the mqtt publisher
+ * @param password          Password for authentication to the mqtt publisher
+ * @param cleanSession      Sets the mqtt cleanSession parameter
+ * @param qos               Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion       Version to use for the mqtt connection
+ */
   def createPairedStream(
       ssc: StreamingContext,
       brokerUrl: String,
@@ -246,57 +263,130 @@ object MQTTUtils {
       mqttVersion: Option[Int]
     ): ReceiverInputDStream[(String, String)] = {
     new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password,
-          cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+      cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
   }
 
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param jssc      JavaStreamingContext object
-   * @param brokerUrl Url of remote MQTT publisher
-   * @param topics    Array of topic names to subscribe to
-   */
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+  *
+  * @param ssc               StreamingContext object
+  * @param brokerUrl         Url of remote MQTT publisher
+  * @param topics            Array of topic names to subscribe to
+  * @param storageLevel      RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+  * @param clientId          ClientId to use for the mqtt connection
+  * @param username          Username for authentication to the mqtt publisher
+  * @param password          Password for authentication to the mqtt publisher
+  * @param cleanSession      Sets the mqtt cleanSession parameter
+  * @param qos               Quality of service to use for the topic subscription
+  * @param connectionTimeout Connection timeout for the mqtt connection
+  * @param keepAliveInterval Keepalive interal for the mqtt connection
+  * @param mqttVersion       Version to use for the mqtt connection
+  */
+  def createPairedByteArrayStream(
+                                   ssc: StreamingContext,
+                                   brokerUrl: String,
+                                   topics: Array[String],
+                                   storageLevel: StorageLevel,
+                                   clientId: Option[String],
+                                   username: Option[String],
+                                   password: Option[String],
+                                   cleanSession: Option[Boolean],
+                                   qos: Option[Int],
+                                   connectionTimeout: Option[Int],
+                                   keepAliveInterval: Option[Int],
+                                   mqttVersion: Option[Int]
+                                 ): ReceiverInputDStream[(String, Array[Byte])] = {
+    new MQTTPairedByteArrayInputDStream(ssc, brokerUrl, topics, storageLevel,
+      clientId, username, password, cleanSession, qos, connectionTimeout,
+      keepAliveInterval, mqttVersion)
+  }
+
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+  * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+  *
+  * @param jssc      JavaStreamingContext object
+  * @param brokerUrl Url of remote MQTT publisher
+  * @param topics    Array of topic names to subscribe to
+  */
   def createPairedStream(
-      jssc: JavaStreamingContext,
-      brokerUrl: String,
-      topics: Array[String]
-    ): JavaReceiverInputDStream[(String, String)] = {
+                          jssc: JavaStreamingContext,
+                          brokerUrl: String,
+                          topics: Array[String]
+                        ): JavaReceiverInputDStream[(String, String)] = {
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createPairedStream(jssc.ssc, brokerUrl, topics)
   }
 
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param jssc          JavaStreamingContext object
-   * @param brokerUrl     Url of remote MQTT publisher
-   * @param topics        Array of topic names to subscribe to
-   * @param storageLevel  RDD storage level.
-   */
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+  * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+  *
+  * @param jssc      JavaStreamingContext object
+  * @param brokerUrl Url of remote MQTT publisher
+  * @param topics    Array of topic names to subscribe to
+  */
+  def createPairedByteArrayStream(
+                                   jssc: JavaStreamingContext,
+                                   brokerUrl: String,
+                                   topics: Array[String]
+                                 ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedByteArrayStream(jssc.ssc, brokerUrl, topics)
+  }
+
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+  *
+  * @param jssc         JavaStreamingContext object
+  * @param brokerUrl    Url of remote MQTT publisher
+  * @param topics       Array of topic names to subscribe to
+  * @param storageLevel RDD storage level.
+  */
   def createPairedStream(
-      jssc: JavaStreamingContext,
-      brokerUrl: String,
-      topics: Array[String],
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[(String, String)] = {
+                          jssc: JavaStreamingContext,
+                          brokerUrl: String,
+                          topics: Array[String],
+                          storageLevel: StorageLevel
+                        ): JavaReceiverInputDStream[(String, String)] = {
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel)
   }
 
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param jssc               JavaStreamingContext object
-   * @param brokerUrl          Url of remote MQTT publisher
-   * @param topics             Array of topic names to subscribe to
-   * @param storageLevel       RDD storage level.
-   * @param clientId           ClientId to use for the mqtt connection
-   * @param username           Username for authentication to the mqtt publisher
-   * @param password           Password for authentication to the mqtt publisher
-   * @param cleanSession       Sets the mqtt cleanSession parameter
-   * @param qos                Quality of service to use for the topic subscription
-   * @param connectionTimeout  Connection timeout for the mqtt connection
-   * @param keepAliveInterval  Keepalive interal for the mqtt connection
-   * @param mqttVersion        Version to use for the mqtt connection
-   */
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc         JavaStreamingContext object
+ * @param brokerUrl    Url of remote MQTT publisher
+ * @param topics       Array of topic names to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+  def createPairedByteArrayStream(
+                                   jssc: JavaStreamingContext,
+                                   brokerUrl: String,
+                                   topics: Array[String],
+                                   storageLevel: StorageLevel
+                                 ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, storageLevel)
+  }
+
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+  *
+  * @param jssc              JavaStreamingContext object
+  * @param brokerUrl         Url of remote MQTT publisher
+  * @param topics            Array of topic names to subscribe to
+  * @param storageLevel      RDD storage level.
+  * @param clientId          ClientId to use for the mqtt connection
+  * @param username          Username for authentication to the mqtt publisher
+  * @param password          Password for authentication to the mqtt publisher
+  * @param cleanSession      Sets the mqtt cleanSession parameter
+  * @param qos               Quality of service to use for the topic subscription
+  * @param connectionTimeout Connection timeout for the mqtt connection
+  * @param keepAliveInterval Keepalive interal for the mqtt connection
+  * @param mqttVersion       Version to use for the mqtt connection
+  */
   def createPairedStream(
       jssc: JavaStreamingContext,
       brokerUrl: String,
@@ -317,20 +407,57 @@ object MQTTUtils {
         Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
   }
 
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param jssc               JavaStreamingContext object
-   * @param brokerUrl          Url of remote MQTT publisher
-   * @param topics             Array of topic names to subscribe to
-   * @param clientId           ClientId to use for the mqtt connection
-   * @param username           Username for authentication to the mqtt publisher
-   * @param password           Password for authentication to the mqtt publisher
-   * @param cleanSession       Sets the mqtt cleanSession parameter
-   * @param qos                Quality of service to use for the topic subscription
-   * @param connectionTimeout  Connection timeout for the mqtt connection
-   * @param keepAliveInterval  Keepalive interal for the mqtt connection
-   * @param mqttVersion        Version to use for the mqtt connection
-   */
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+  *
+  * @param jssc              JavaStreamingContext object
+  * @param brokerUrl         Url of remote MQTT publisher
+  * @param topics            Array of topic names to subscribe to
+  * @param storageLevel      RDD storage level.
+  * @param clientId          ClientId to use for the mqtt connection
+  * @param username          Username for authentication to the mqtt publisher
+  * @param password          Password for authentication to the mqtt publisher
+  * @param cleanSession      Sets the mqtt cleanSession parameter
+  * @param qos               Quality of service to use for the topic subscription
+  * @param connectionTimeout Connection timeout for the mqtt connection
+  * @param keepAliveInterval Keepalive interal for the mqtt connection
+  * @param mqttVersion       Version to use for the mqtt connection
+  */
+  def createPairedByteArrayStream(
+                                   jssc: JavaStreamingContext,
+                                   brokerUrl: String,
+                                   topics: Array[String],
+                                   storageLevel: StorageLevel,
+                                   clientId: String,
+                                   username: String,
+                                   password: String,
+                                   cleanSession: Boolean,
+                                   qos: Int,
+                                   connectionTimeout: Int,
+                                   keepAliveInterval: Int,
+                                   mqttVersion: Int
+                                 ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, storageLevel, Option(clientId),
+      Option(username), Option(password), Option(cleanSession), Option(qos),
+      Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+  }
+
+/**
+* Create an input stream that receives messages pushed by a MQTT publisher.
+  *
+  * @param jssc              JavaStreamingContext object
+  * @param brokerUrl         Url of remote MQTT publisher
+  * @param topics            Array of topic names to subscribe to
+  * @param clientId          ClientId to use for the mqtt connection
+  * @param username          Username for authentication to the mqtt publisher
+  * @param password          Password for authentication to the mqtt publisher
+  * @param cleanSession      Sets the mqtt cleanSession parameter
+  * @param qos               Quality of service to use for the topic subscription
+  * @param connectionTimeout Connection timeout for the mqtt connection
+  * @param keepAliveInterval Keepalive interal for the mqtt connection
+  * @param mqttVersion       Version to use for the mqtt connection
+  */
   def createPairedStream(
       jssc: JavaStreamingContext,
       brokerUrl: String,
@@ -346,20 +473,56 @@ object MQTTUtils {
     ): JavaReceiverInputDStream[(String, String)] = {
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
-        Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos),
-        Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+      Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos),
+      Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
   }
 
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param jssc               JavaStreamingContext object
-   * @param brokerUrl          Url of remote MQTT publisher
-   * @param topics             Array of topic names to subscribe to
-   * @param clientId           ClientId to use for the mqtt connection
-   * @param username           Username for authentication to the mqtt publisher
-   * @param password           Password for authentication to the mqtt publisher
-   * @param cleanSession       Sets the mqtt cleanSession parameter
-   */
+/**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc              JavaStreamingContext object
+ * @param brokerUrl         Url of remote MQTT publisher
+ * @param topics            Array of topic names to subscribe to
+ * @param clientId          ClientId to use for the mqtt connection
+ * @param username          Username for authentication to the mqtt publisher
+ * @param password          Password for authentication to the mqtt publisher
+ * @param cleanSession      Sets the mqtt cleanSession parameter
+ * @param qos               Quality of service to use for the topic subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion       Version to use for the mqtt connection
+ */
+  def createPairedByteArrayStream(
+                                   jssc: JavaStreamingContext,
+                                   brokerUrl: String,
+                                   topics: Array[String],
+                                   clientId: String,
+                                   username: String,
+                                   password: String,
+                                   cleanSession: Boolean,
+                                   qos: Int,
+                                   connectionTimeout: Int,
+                                   keepAliveInterval: Int,
+                                   mqttVersion: Int
+                                 ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
+      Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos),
+      Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion))
+  }
+
+
+/**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc         JavaStreamingContext object
+ * @param brokerUrl    Url of remote MQTT publisher
+ * @param topics       Array of topic names to subscribe to
+ * @param clientId     ClientId to use for the mqtt connection
+ * @param username     Username for authentication to the mqtt publisher
+ * @param password     Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ */
   def createPairedStream(
       jssc: JavaStreamingContext,
       brokerUrl: String,
@@ -368,12 +531,40 @@ object MQTTUtils {
       username: String,
       password: String,
       cleanSession: Boolean
-    ): JavaReceiverInputDStream[(String, String)] = {
+      ): JavaReceiverInputDStream[(String, String)] = {
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
-        Option(clientId), Option(username), Option(password), Option(cleanSession), None,
-        None, None, None)
+      Option(clientId), Option(username), Option(password), Option(cleanSession), None,
+      None, None, None)
   }
+
+
+/**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ *
+ * @param jssc         JavaStreamingContext object
+ * @param brokerUrl    Url of remote MQTT publisher
+ * @param topics       Array of topic names to subscribe to
+ * @param clientId     ClientId to use for the mqtt connection
+ * @param username     Username for authentication to the mqtt publisher
+ * @param password     Password for authentication to the mqtt publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ */
+  def createPairedByteArrayStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      clientId: String,
+      username: String,
+      password: String,
+      cleanSession: Boolean
+      ): JavaReceiverInputDStream[(String, Array[Byte])] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2,
+      Option(clientId), Option(username), Option(password), Option(cleanSession), None,
+      None, None, None)
+  }
+
 }
 
 /**
@@ -398,4 +589,13 @@ private[mqtt] class MQTTUtilsPythonHelper {
     ): JavaDStream[(String, String)] = {
     MQTTUtils.createPairedStream(jssc, brokerUrl, topics, storageLevel)
   }
+
+  def createPairedByteArrayStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topics: Array[String],
+      storageLevel: StorageLevel
+      ): JavaDStream[(String, Array[Byte])] = {
+    MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics, storageLevel)
+  }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index d320595..e30d187 100644
--- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -49,8 +49,26 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
       brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user",
       "password", true, 1, 10, 30, 3);
     JavaReceiverInputDStream<Tuple2<String, String>> test9 = MQTTUtils.createPairedStream(ssc,
-      brokerUrl, topics, "testid", "user", "password", true, 1, 10, 30, 3);
+      brokerUrl, topics, "testid", "user", "password", true, 1,
+      10, 30, 3);
     JavaReceiverInputDStream<Tuple2<String, String>> test10 = MQTTUtils.createPairedStream(ssc,
       brokerUrl, topics, "testid", "user", "password", true);
+    JavaReceiverInputDStream<Tuple2<String, byte[]>> test11 =
+            MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics);
+    JavaReceiverInputDStream<Tuple2<String, byte[]>> test12 =
+            MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics,
+            StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<Tuple2<String, byte[]>> test13 =
+            MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics,
+            StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user",
+            "password", true, 1, 10, 30, 3);
+    JavaReceiverInputDStream<Tuple2<String, byte[]>> test14 =
+            MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics,
+            "testid", "user", "password", true,
+             1, 10, 30, 3);
+    JavaReceiverInputDStream<Tuple2<String, byte[]>> test15 =
+            MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, "testid",
+            "user", "password", true);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index f1d9a20..6ef551b 100644
--- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -101,4 +101,30 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     }
     ssc.stop()
   }
+
+  test("mqtt input stream3") {
+    val sendMessage1 = "MQTT demo for spark streaming1"
+    val sendMessage2 = "MQTT demo for spark streaming2"
+    val receiveStream2 = MQTTUtils.createPairedByteArrayStream(ssc,
+      "tcp://" + mqttTestUtils.brokerUri, topics, StorageLevel.MEMORY_ONLY)
+
+    @volatile var receiveMessage: List[String] = List()
+    receiveStream2.foreachRDD { rdd =>
+      if (rdd.collect.length > 0) {
+        receiveMessage = receiveMessage ::: List(new String(rdd.first()._2))
+        receiveMessage
+      }
+    }
+
+    ssc.start()
+
+    // Retry it because we don't know when the receiver will start.
+    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+      mqttTestUtils.publishData(topics(0), sendMessage1)
+      mqttTestUtils.publishData(topics(1), sendMessage2)
+      assert(receiveMessage.contains(sendMessage1)||receiveMessage.contains(sendMessage2))
+    }
+    ssc.stop()
+  }
+
 }