You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/01/05 04:38:03 UTC

spark git commit: [SPARK-4631] unit test for MQTT

Repository: spark
Updated Branches:
  refs/heads/master 3fddc9468 -> e767d7dda


[SPARK-4631] unit test for MQTT

Please review the unit test for MQTT

Author: bilna <bi...@am.amrita.edu>
Author: Bilna P <bi...@gmail.com>

Closes #3844 from Bilna/master and squashes the following commits:

acea3a3 [bilna] Adding dependency with scope test
28681fa [bilna] Merge remote-tracking branch 'upstream/master'
fac3904 [bilna] Correction in Indentation and coding style
ed9db4c [bilna] Merge remote-tracking branch 'upstream/master'
4b34ee7 [Bilna P] Update MQTTStreamSuite.scala
04503cf [bilna] Added embedded broker service for mqtt test
89d804e [bilna] Merge remote-tracking branch 'upstream/master'
fc8eb28 [bilna] Merge remote-tracking branch 'upstream/master'
4b58094 [Bilna P] Update MQTTStreamSuite.scala
b1ac4ad [bilna] Added BeforeAndAfter
5f6bfd2 [bilna] Added BeforeAndAfter
e8b6623 [Bilna P] Update MQTTStreamSuite.scala
5ca6691 [Bilna P] Update MQTTStreamSuite.scala
8616495 [bilna] [SPARK-4631] unit test for MQTT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e767d7dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e767d7dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e767d7dd

Branch: refs/heads/master
Commit: e767d7ddac5c2330af553f2a74b8575dfc7afb67
Parents: 3fddc94
Author: bilna <bi...@am.amrita.edu>
Authored: Sun Jan 4 19:37:48 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 4 19:37:48 2015 -0800

----------------------------------------------------------------------
 external/mqtt/pom.xml                           |   6 +
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  | 110 ++++++++++++++++---
 2 files changed, 101 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e767d7dd/external/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 9025915..d478267 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -66,6 +66,12 @@
       <artifactId>junit-interface</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <version>5.7.0</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

http://git-wip-us.apache.org/repos/asf/spark/blob/e767d7dd/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 84595ac..98fe6cb 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -17,31 +17,111 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = "local[2]"
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = "//localhost:" + freePort
+  private val topic = "def"
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test("mqtt input stream") {
-    val ssc = new StreamingContext(master, framework, batchDuration)
-    val brokerUrl = "abc"
-    val topic = "def"
+  before {
+    ssc = new StreamingContext(master, framework, batchDuration)
+    setupMQTT()
+  }
 
-    // tests the API, does not actually test data receiving
-    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
-    val test2: ReceiverInputDStream[String] =
-      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+    if (ssc != null) {
+      ssc.stop()
+      ssc = null
+    }
+    Utils.deleteRecursively(persistenceDir)
+    tearDownMQTT()
+  }
 
-    // TODO: Actually test receiving data
+  test("mqtt input stream") {
+    val sendMessage = "MQTT demo for spark streaming"
+    val receiveStream: ReceiverInputDStream[String] =
+      MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
+    var receiveMessage: List[String] = List()
+    receiveStream.foreachRDD { rdd =>
+      if (rdd.collect.length > 0) {
+        receiveMessage = receiveMessage ::: List(rdd.first)
+        receiveMessage
+      }
+    }
+    ssc.start()
+    publishData(sendMessage)
+    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+      assert(sendMessage.equals(receiveMessage(0)))
+    }
     ssc.stop()
   }
+
+  private def setupMQTT() {
+    broker = new BrokerService()
+    connector = new TransportConnector()
+    connector.setName("mqtt")
+    connector.setUri(new URI("mqtt:" + brokerUri))
+    broker.addConnector(connector)
+    broker.start()
+  }
+
+  private def tearDownMQTT() {
+    if (broker != null) {
+      broker.stop()
+      broker = null
+    }
+    if (connector != null) {
+      connector.stop()
+      connector = null
+    }
+  }
+
+  private def findFreePort(): Int = {
+    Utils.startServiceOnPort(23456, (trialPort: Int) => {
+      val socket = new ServerSocket(trialPort)
+      socket.close()
+      (null, trialPort)
+    })._2
+  }
+
+  def publishData(data: String): Unit = {
+    var client: MqttClient = null
+    try {
+      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+      client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
+      client.connect()
+      if (client.isConnected) {
+        val msgTopic: MqttTopic = client.getTopic(topic)
+        val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+        message.setQos(1)
+        message.setRetained(true)
+        for (i <- 0 to 100)
+          msgTopic.publish(message)
+      }
+    } finally {
+      client.disconnect()
+      client.close()
+      client = null
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org