You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/01/05 04:38:03 UTC
spark git commit: [SPARK-4631] unit test for MQTT
Repository: spark
Updated Branches:
refs/heads/master 3fddc9468 -> e767d7dda
[SPARK-4631] unit test for MQTT
Please review the unit test for MQTT
Author: bilna <bi...@am.amrita.edu>
Author: Bilna P <bi...@gmail.com>
Closes #3844 from Bilna/master and squashes the following commits:
acea3a3 [bilna] Adding dependency with scope test
28681fa [bilna] Merge remote-tracking branch 'upstream/master'
fac3904 [bilna] Correction in Indentation and coding style
ed9db4c [bilna] Merge remote-tracking branch 'upstream/master'
4b34ee7 [Bilna P] Update MQTTStreamSuite.scala
04503cf [bilna] Added embedded broker service for mqtt test
89d804e [bilna] Merge remote-tracking branch 'upstream/master'
fc8eb28 [bilna] Merge remote-tracking branch 'upstream/master'
4b58094 [Bilna P] Update MQTTStreamSuite.scala
b1ac4ad [bilna] Added BeforeAndAfter
5f6bfd2 [bilna] Added BeforeAndAfter
e8b6623 [Bilna P] Update MQTTStreamSuite.scala
5ca6691 [Bilna P] Update MQTTStreamSuite.scala
8616495 [bilna] [SPARK-4631] unit test for MQTT
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e767d7dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e767d7dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e767d7dd
Branch: refs/heads/master
Commit: e767d7ddac5c2330af553f2a74b8575dfc7afb67
Parents: 3fddc94
Author: bilna <bi...@am.amrita.edu>
Authored: Sun Jan 4 19:37:48 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 4 19:37:48 2015 -0800
----------------------------------------------------------------------
external/mqtt/pom.xml | 6 +
.../spark/streaming/mqtt/MQTTStreamSuite.scala | 110 ++++++++++++++++---
2 files changed, 101 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e767d7dd/external/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 9025915..d478267 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -66,6 +66,12 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>5.7.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
http://git-wip-us.apache.org/repos/asf/spark/blob/e767d7dd/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 84595ac..98fe6cb 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -17,31 +17,111 @@
package org.apache.spark.streaming.mqtt
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-class MQTTStreamSuite extends FunSuite {
-
- val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
+ private val batchDuration = Milliseconds(500)
private val master: String = "local[2]"
-
private val framework: String = this.getClass.getSimpleName
+ private val freePort = findFreePort()
+ private val brokerUri = "//localhost:" + freePort
+ private val topic = "def"
+ private var ssc: StreamingContext = _
+ private val persistenceDir = Utils.createTempDir()
+ private var broker: BrokerService = _
+ private var connector: TransportConnector = _
- test("mqtt input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val brokerUrl = "abc"
- val topic = "def"
+ before {
+ ssc = new StreamingContext(master, framework, batchDuration)
+ setupMQTT()
+ }
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
- val test2: ReceiverInputDStream[String] =
- MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ Utils.deleteRecursively(persistenceDir)
+ tearDownMQTT()
+ }
- // TODO: Actually test receiving data
+ test("mqtt input stream") {
+ val sendMessage = "MQTT demo for spark streaming"
+ val receiveStream: ReceiverInputDStream[String] =
+ MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
+ var receiveMessage: List[String] = List()
+ receiveStream.foreachRDD { rdd =>
+ if (rdd.collect.length > 0) {
+ receiveMessage = receiveMessage ::: List(rdd.first)
+ receiveMessage
+ }
+ }
+ ssc.start()
+ publishData(sendMessage)
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ assert(sendMessage.equals(receiveMessage(0)))
+ }
ssc.stop()
}
+
+ private def setupMQTT() {
+ broker = new BrokerService()
+ connector = new TransportConnector()
+ connector.setName("mqtt")
+ connector.setUri(new URI("mqtt:" + brokerUri))
+ broker.addConnector(connector)
+ broker.start()
+ }
+
+ private def tearDownMQTT() {
+ if (broker != null) {
+ broker.stop()
+ broker = null
+ }
+ if (connector != null) {
+ connector.stop()
+ connector = null
+ }
+ }
+
+ private def findFreePort(): Int = {
+ Utils.startServiceOnPort(23456, (trialPort: Int) => {
+ val socket = new ServerSocket(trialPort)
+ socket.close()
+ (null, trialPort)
+ })._2
+ }
+
+ def publishData(data: String): Unit = {
+ var client: MqttClient = null
+ try {
+ val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+ client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
+ client.connect()
+ if (client.isConnected) {
+ val msgTopic: MqttTopic = client.getTopic(topic)
+ val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+ message.setQos(1)
+ message.setRetained(true)
+ for (i <- 0 to 100)
+ msgTopic.publish(message)
+ }
+ } finally {
+ client.disconnect()
+ client.close()
+ client = null
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org