You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/02/25 15:37:40 UTC

spark git commit: [SPARK-5666][streaming][MQTT streaming] some trivial fixes

Repository: spark
Updated Branches:
  refs/heads/master d641fbb39 -> d51ed263e


[SPARK-5666][streaming][MQTT streaming] some trivial fixes

modified to adhere to accepted coding standards as pointed by tdas in PR #3844

Author: prabs <pr...@gmail.com>
Author: Prabeesh K <pr...@gmail.com>

Closes #4178 from prabeesh/master and squashes the following commits:

bd2cb49 [Prabeesh K] adress the comment
ccc0765 [prabs] adress the comment
46f9619 [prabs] adress the comment
c035bdc [prabs] adress the comment
22dd7f7 [prabs] address the comments
0cc67bd [prabs] adress the comment
838c38e [prabs] adress the comment
cd57029 [prabs] address the comments
66919a3 [Prabeesh K] changed MqttDefaultFilePersistence to MemoryPersistence
5857989 [prabs] modified to adhere to accepted coding standards


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d51ed263
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d51ed263
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d51ed263

Branch: refs/heads/master
Commit: d51ed263ee791967380de6b9c892985ce87f6fcb
Parents: d641fbb
Author: prabs <pr...@gmail.com>
Authored: Wed Feb 25 14:37:35 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Feb 25 14:37:35 2015 +0000

----------------------------------------------------------------------
 .../examples/streaming/MQTTWordCount.scala      | 49 ++++++++++++--------
 .../spark/streaming/mqtt/MQTTInputDStream.scala | 26 +++++------
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  3 +-
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  | 12 ++---
 4 files changed, 50 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d51ed263/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index 6ff0c47..f40caad 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.examples.streaming
 
-import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -31,8 +31,6 @@ import org.apache.spark.SparkConf
  */
 object MQTTPublisher {
 
-  var client: MqttClient = _
-
   def main(args: Array[String]) {
     if (args.length < 2) {
       System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
@@ -42,25 +40,36 @@ object MQTTPublisher {
     StreamingExamples.setStreamingLogLevels()
 
     val Seq(brokerUrl, topic) = args.toSeq
+    
+    var client: MqttClient = null
 
     try {
-      var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
-      client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+      val persistence = new MemoryPersistence()
+      client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
+
+      client.connect()
+
+      val msgtopic  = client.getTopic(topic)
+      val msgContent = "hello mqtt demo for spark streaming"
+      val message = new MqttMessage(msgContent.getBytes("utf-8"))
+
+      while (true) {
+        try {
+          msgtopic.publish(message)
+          println(s"Published data. topic: {msgtopic.getName()}; Message: {message}")
+        } catch {
+          case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
+            Thread.sleep(10) 
+            println("Queue is full, wait for to consume data from the message queue")
+        }  
+      }      
     } catch {
       case e: MqttException => println("Exception Caught: " + e)
+    } finally {
+      if (client != null) {
+        client.disconnect()
+      }
     }
-
-    client.connect()
-
-    val msgtopic: MqttTopic = client.getTopic(topic)
-    val msg: String = "hello mqtt demo for spark streaming"
-
-    while (true) {
-      val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8"))
-      msgtopic.publish(message)
-      println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
-    }
-   client.disconnect()
   }
 }
 
@@ -96,9 +105,9 @@ object MQTTWordCount {
     val sparkConf = new SparkConf().setAppName("MQTTWordCount")
     val ssc = new StreamingContext(sparkConf, Seconds(2))
     val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
-
-    val words = lines.flatMap(x => x.toString.split(" "))
+    val words = lines.flatMap(x => x.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+    
     wordCounts.print()
     ssc.start()
     ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/d51ed263/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 1ef91dd..3c0ef94 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -17,23 +17,23 @@
 
 package org.apache.spark.streaming.mqtt
 
+import java.io.IOException
+import java.util.concurrent.Executors
+import java.util.Properties
+
+import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-import java.util.Properties
-import java.util.concurrent.Executors
-import java.io.IOException
-
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
 import org.eclipse.paho.client.mqttv3.MqttCallback
 import org.eclipse.paho.client.mqttv3.MqttClient
 import org.eclipse.paho.client.mqttv3.MqttClientPersistence
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
 import org.eclipse.paho.client.mqttv3.MqttException
 import org.eclipse.paho.client.mqttv3.MqttMessage
 import org.eclipse.paho.client.mqttv3.MqttTopic
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 
 import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
@@ -82,18 +82,18 @@ class MQTTReceiver(
     val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
 
     // Callback automatically triggers as and when new message arrives on specified topic
-    val callback: MqttCallback = new MqttCallback() {
+    val callback = new MqttCallback() {
 
       // Handles Mqtt message
-      override def messageArrived(arg0: String, arg1: MqttMessage) {
-        store(new String(arg1.getPayload(),"utf-8"))
+      override def messageArrived(topic: String, message: MqttMessage) {
+        store(new String(message.getPayload(),"utf-8"))
       }
 
-      override def deliveryComplete(arg0: IMqttDeliveryToken) {
+      override def deliveryComplete(token: IMqttDeliveryToken) {
       }
 
-      override def connectionLost(arg0: Throwable) {
-        restart("Connection lost ", arg0)
+      override def connectionLost(cause: Throwable) {
+        restart("Connection lost ", cause)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d51ed263/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index c5ffe51..1142d0f 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.streaming.mqtt
 
+import scala.reflect.ClassTag
+
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
-import scala.reflect.ClassTag
 import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
 
 object MQTTUtils {

http://git-wip-us.apache.org/repos/asf/spark/blob/d51ed263/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 19c9271..0f3298a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -42,8 +42,8 @@ import org.apache.spark.util.Utils
 class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
 
   private val batchDuration = Milliseconds(500)
-  private val master: String = "local[2]"
-  private val framework: String = this.getClass.getSimpleName
+  private val master = "local[2]"
+  private val framework = this.getClass.getSimpleName
   private val freePort = findFreePort()
   private val brokerUri = "//localhost:" + freePort
   private val topic = "def"
@@ -69,7 +69,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
 
   test("mqtt input stream") {
     val sendMessage = "MQTT demo for spark streaming"
-    val receiveStream: ReceiverInputDStream[String] =
+    val receiveStream =
       MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
     @volatile var receiveMessage: List[String] = List()
     receiveStream.foreachRDD { rdd =>
@@ -123,12 +123,12 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
   def publishData(data: String): Unit = {
     var client: MqttClient = null
     try {
-      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+      val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
       client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
       client.connect()
       if (client.isConnected) {
-        val msgTopic: MqttTopic = client.getTopic(topic)
-        val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+        val msgTopic = client.getTopic(topic)
+        val message = new MqttMessage(data.getBytes("utf-8"))
         message.setQos(1)
         message.setRetained(true)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org